Struct tange_collection::collection::memory::MemoryCollection
source · pub struct MemoryCollection<A> { /* private fields */ }
Expand description
MemoryCollection struct
Implementations§
source§impl<A: Any + Send + Sync + Clone> MemoryCollection<A>
impl<A: Any + Send + Sync + Clone> MemoryCollection<A>
sourcepub fn from_defs(vs: Vec<Deferred<Vec<A>>>) -> MemoryCollection<A>
pub fn from_defs(vs: Vec<Deferred<Vec<A>>>) -> MemoryCollection<A>
Creates a MemoryCollection from a set of Deferred objects.
sourcepub fn to_defs(&self) -> &Vec<Deferred<Vec<A>>>
pub fn to_defs(&self) -> &Vec<Deferred<Vec<A>>>
Provides raw access to the underlying Deferred objects
sourcepub fn from_vec(vs: Vec<A>) -> MemoryCollection<A>
pub fn from_vec(vs: Vec<A>) -> MemoryCollection<A>
Creates a new MemoryCollection from a Vec of items
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let col = MemoryCollection::from_vec(vec![1,2,3usize]);
assert_eq!(col.run(&GreedyScheduler::new()), Some(vec![1,2,3usize]));
sourcepub fn n_partitions(&self) -> usize
pub fn n_partitions(&self) -> usize
Returns the current number of data partitions
sourcepub fn concat(&self, other: &MemoryCollection<A>) -> MemoryCollection<A>
pub fn concat(&self, other: &MemoryCollection<A>) -> MemoryCollection<A>
Concatentates two collections into a single Collection
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let one = MemoryCollection::from_vec(vec![1,2,3usize]);
let two = MemoryCollection::from_vec(vec![4usize, 5, 6]);
let cat = one.concat(&two);
assert_eq!(cat.run(&GreedyScheduler::new()), Some(vec![1,2,3,4,5,6]));
sourcepub fn map<B: Any + Send + Sync + Clone, F: 'static + Sync + Send + Clone + Fn(&A) -> B>(
&self,
f: F
) -> MemoryCollection<B>
pub fn map<B: Any + Send + Sync + Clone, F: 'static + Sync + Send + Clone + Fn(&A) -> B>(
&self,
f: F
) -> MemoryCollection<B>
Maps a function over the values in the DiskCollection, returning a new DiskCollection
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let one = MemoryCollection::from_vec(vec![1,2,3usize]);
let strings = one.map(|i| format!("{}", i));
assert_eq!(strings.run(&GreedyScheduler::new()),
Some(vec!["1".into(),"2".into(),"3".into()]));
sourcepub fn filter<F: 'static + Sync + Send + Clone + Fn(&A) -> bool>(
&self,
f: F
) -> MemoryCollection<A>
pub fn filter<F: 'static + Sync + Send + Clone + Fn(&A) -> bool>(
&self,
f: F
) -> MemoryCollection<A>
Filters out items in the collection that fail the predicate.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let col = MemoryCollection::from_vec(vec![1,2,3usize]);
let odds = col.filter(|x| x % 2 == 1);
assert_eq!(odds.run(&GreedyScheduler::new()),
Some(vec![1, 3usize]));
sourcepub fn split(&self, n_chunks: usize) -> MemoryCollection<A>
pub fn split(&self, n_chunks: usize) -> MemoryCollection<A>
Re-partitions a collection by the number of provided chunks. It uniformly distributes data from each old partition into each new partition.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let col = MemoryCollection::from_vec(vec![1,2,3usize]);
assert_eq!(col.n_partitions(), 1);
let two = col.split(2);
assert_eq!(two.n_partitions(), 2);
sourcepub fn emit<B: Any + Send + Sync + Clone, F: 'static + Sync + Send + Clone + Fn(&A, &mut dyn FnMut(B))>(
&self,
f: F
) -> MemoryCollection<B>
pub fn emit<B: Any + Send + Sync + Clone, F: 'static + Sync + Send + Clone + Fn(&A, &mut dyn FnMut(B))>(
&self,
f: F
) -> MemoryCollection<B>
Maps over all items in a collection, optionally emitting new values. It can be used to efficiently fuse a number of map/filter/flat_map functions into a single method.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let col = MemoryCollection::from_vec(vec![1,2,3usize]);
let new = col.emit(|item, emitter| {
if item % 2 == 0 {
emitter(format!("{}!", item));
}
});
assert_eq!(new.run(&GreedyScheduler::new()), Some(vec!["2!".into()]));
sourcepub fn emit_to_disk<B: Any + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>, F: 'static + Sync + Send + Clone + Fn(&A, &mut dyn FnMut(B))>(
&self,
path: String,
f: F
) -> DiskCollection<B>
pub fn emit_to_disk<B: Any + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>, F: 'static + Sync + Send + Clone + Fn(&A, &mut dyn FnMut(B))>(
&self,
path: String,
f: F
) -> DiskCollection<B>
Maps over all items in a collection, emitting new values. It can be used
to efficiently fuse a number of map/filter/flat_map functions into a single method.
emit_to_disk
differs from the original emit
by writing the emitted values directly
to disk, returning a DiskCollection instead of MemoryCollection. This makes it convenient to switch to out-of-core when needed.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let col = MemoryCollection::from_vec(vec![1,2,3usize]);
let new = col.emit_to_disk("/tmp".into(), |item, emitter| {
if item % 2 == 0 {
emitter(format!("{}!", item));
}
});
assert_eq!(new.run(&GreedyScheduler::new()), Some(vec!["2!".into()]));
sourcepub fn partition<F: 'static + Sync + Send + Clone + Fn(usize, &A) -> usize>(
&self,
partitions: usize,
f: F
) -> MemoryCollection<A>
pub fn partition<F: 'static + Sync + Send + Clone + Fn(usize, &A) -> usize>(
&self,
partitions: usize,
f: F
) -> MemoryCollection<A>
Re-partitions data into N new partitions by the given function. The user provided function is used as a hash function, mapping the returned value to a partition index. This makes it useful for managing which partition data ends up!
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let col = MemoryCollection::from_vec(vec![1,2,3,4usize]);
let new_col = col.partition(2, |idx, x| if *x < 3 { 1 } else { 2 });
assert_eq!(new_col.n_partitions(), 2);
assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![3, 4, 1, 2]));
sourcepub fn fold_by<K: Any + Sync + Send + Clone + Hash + Eq, B: Any + Sync + Send + Clone, D: 'static + Sync + Send + Clone + Fn() -> B, F: 'static + Sync + Send + Clone + Fn(&A) -> K, O: 'static + Sync + Send + Clone + Fn(&mut B, &A), R: 'static + Sync + Send + Clone + Fn(&mut B, &B)>(
&self,
key: F,
default: D,
binop: O,
reduce: R,
partitions: usize
) -> MemoryCollection<(K, B)>
pub fn fold_by<K: Any + Sync + Send + Clone + Hash + Eq, B: Any + Sync + Send + Clone, D: 'static + Sync + Send + Clone + Fn() -> B, F: 'static + Sync + Send + Clone + Fn(&A) -> K, O: 'static + Sync + Send + Clone + Fn(&mut B, &A), R: 'static + Sync + Send + Clone + Fn(&mut B, &B)>(
&self,
key: F,
default: D,
binop: O,
reduce: R,
partitions: usize
) -> MemoryCollection<(K, B)>
Folds and accumulates values across multiple partitions into K new partitions. This is also known as a “group by” with a following reducer.
MemoryCollection first performs a block aggregation: that is, it combines values
within each partition first using the binop
function. It then hashes
each key to a new partition index, where it will then aggregate all keys using the
reduce
function.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let col = MemoryCollection::from_vec(vec![1,2,3,4,5usize]);
// Sum all odds and evens together
let group_sum = col.fold_by(|x| x % 2,
|| 0usize,
|block_acc, item| {*block_acc += *item},
|part_acc1, part_acc2| {*part_acc1 += *part_acc2},
1)
.sort_by(|x| x.0);
assert_eq!(group_sum.n_partitions(), 1);
assert_eq!(group_sum.run(&GreedyScheduler::new()), Some(vec![(0, 6), (1, 9)]));
sourcepub fn partition_by_key<K: Any + Sync + Send + Clone + Hash + Eq, F: 'static + Sync + Send + Clone + Fn(&A) -> K>(
&self,
n_chunks: usize,
key: F
) -> MemoryCollection<A>
pub fn partition_by_key<K: Any + Sync + Send + Clone + Hash + Eq, F: 'static + Sync + Send + Clone + Fn(&A) -> K>(
&self,
n_chunks: usize,
key: F
) -> MemoryCollection<A>
Simple function to re-partition values by a given key. The return key is hashed and moduloed by the new partition count to determine where it will end up.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let col = MemoryCollection::from_vec(vec![1,2,3,4usize]);
let new_col = col.partition_by_key(2, |x| format!("{}", x));
assert_eq!(new_col.n_partitions(), 2);
assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 1, 2, 3]));
sourcepub fn sort_by<K: Ord, F: 'static + Sync + Send + Clone + Fn(&A) -> K>(
&self,
key: F
) -> MemoryCollection<A>
pub fn sort_by<K: Ord, F: 'static + Sync + Send + Clone + Fn(&A) -> K>(
&self,
key: F
) -> MemoryCollection<A>
Sorts values within each partition by a key function. If a global sort is desired, the collection needs to be re-partitioned into a single partition
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let col = MemoryCollection::from_vec(vec![1,2,3,4i32]);
let new_col = col.sort_by(|x| -*x);
assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 3, 2, 1]));
sourcepub fn join_on<K: Any + Sync + Send + Clone + Hash + Eq, B: Any + Sync + Send + Clone, C: Any + Sync + Send + Clone, KF1: 'static + Sync + Send + Clone + Fn(&A) -> K, KF2: 'static + Sync + Send + Clone + Fn(&B) -> K, J: 'static + Sync + Send + Clone + Fn(&A, &B) -> C>(
&self,
other: &MemoryCollection<B>,
key1: KF1,
key2: KF2,
joiner: J,
partitions: usize
) -> MemoryCollection<(K, C)>
pub fn join_on<K: Any + Sync + Send + Clone + Hash + Eq, B: Any + Sync + Send + Clone, C: Any + Sync + Send + Clone, KF1: 'static + Sync + Send + Clone + Fn(&A) -> K, KF2: 'static + Sync + Send + Clone + Fn(&B) -> K, J: 'static + Sync + Send + Clone + Fn(&A, &B) -> C>(
&self,
other: &MemoryCollection<B>,
key1: KF1,
key2: KF2,
joiner: J,
partitions: usize
) -> MemoryCollection<(K, C)>
Inner Joins two collections by the provided key function. If multiple values of the same key are found, they will be cross product for each pair found.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let name_age: Vec<(String,u32)> = vec![("Andrew".into(), 33), ("Leah".into(), 12)];
let name_money: Vec<(String,f32)> = vec![("Leah".into(), 20.50)];
let na = MemoryCollection::from_vec(name_age);
let nm = MemoryCollection::from_vec(name_money);
let joined = na.join_on(&nm,
|nax| nax.0.clone(),
|nmx| nmx.0.clone(),
|nax, nmx| (nax.0.clone(), nax.1, nmx.1),
1);
assert_eq!(joined.run(&GreedyScheduler::new()),
Some(vec![("Leah".into(), ("Leah".into(), 12, 20.50))]));
source§impl<A: Any + Send + Sync + Clone> MemoryCollection<Vec<A>>
impl<A: Any + Send + Sync + Clone> MemoryCollection<Vec<A>>
sourcepub fn flatten(&self) -> MemoryCollection<A>
pub fn flatten(&self) -> MemoryCollection<A>
Flattens a vector of values
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let col = MemoryCollection::from_vec(vec![vec![1usize,2],vec![3,4]]);
let flattened = col.flatten();
assert_eq!(flattened.run(&GreedyScheduler::new()), Some(vec![1, 2, 3, 4]));
source§impl<A: Any + Send + Sync + Clone> MemoryCollection<A>
impl<A: Any + Send + Sync + Clone> MemoryCollection<A>
sourcepub fn count(&self) -> MemoryCollection<usize>
pub fn count(&self) -> MemoryCollection<usize>
Returns the number of items in the collection.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let col = MemoryCollection::from_vec(vec![vec![1usize,2],vec![3,4]]);
assert_eq!(col.count().run(&GreedyScheduler::new()), Some(vec![2]));
let flattened = col.flatten();
assert_eq!(flattened.count().run(&GreedyScheduler::new()), Some(vec![4]));
source§impl<A: Any + Send + Sync + Clone + PartialEq + Hash + Eq> MemoryCollection<A>
impl<A: Any + Send + Sync + Clone + PartialEq + Hash + Eq> MemoryCollection<A>
sourcepub fn frequencies(&self, partitions: usize) -> MemoryCollection<(A, usize)>
pub fn frequencies(&self, partitions: usize) -> MemoryCollection<(A, usize)>
Computes the frequencies of the items in collection.
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::collection::memory::MemoryCollection;
let col = MemoryCollection::from_vec(vec![1, 2, 1, 5, 1, 2]);
let freqs = col.frequencies(1).sort_by(|x| x.0);
assert_eq!(freqs.run(&GreedyScheduler::new()), Some(vec![(1, 3), (2, 2), (5, 1)]));
source§impl MemoryCollection<String>
impl MemoryCollection<String>
sourcepub fn sink(&self, path: &str) -> MemoryCollection<usize>
pub fn sink(&self, path: &str) -> MemoryCollection<usize>
Writes each record in a collection to disk, newline delimited. MemoryCollection will create a new file within the path for each partition.
source§impl<A: Any + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>> MemoryCollection<A>
impl<A: Any + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>> MemoryCollection<A>
sourcepub fn to_disk(&self, path: String) -> DiskCollection<A>
pub fn to_disk(&self, path: String) -> DiskCollection<A>
Copies the MemoryCollection to disk, returning a DiskCollection
Trait Implementations§
source§impl<A: Clone> Clone for MemoryCollection<A>
impl<A: Clone> Clone for MemoryCollection<A>
source§fn clone(&self) -> MemoryCollection<A>
fn clone(&self) -> MemoryCollection<A>
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read more