Skip to main content

MemoryCollection

Struct MemoryCollection 

Source
pub struct MemoryCollection<A> { /* private fields */ }
Expand description

MemoryCollection struct

Implementations§

Source§

impl<A: Any + Send + Sync + Clone> MemoryCollection<A>

Source

pub fn from_defs(vs: Vec<Deferred<Vec<A>>>) -> MemoryCollection<A>

Creates a MemoryCollection from a set of Deferred objects.

Source

pub fn to_defs(&self) -> &Vec<Deferred<Vec<A>>>

Provides raw access to the underlying Deferred objects

Source

pub fn from_vec(vs: Vec<A>) -> MemoryCollection<A>

Creates a new MemoryCollection from a Vec of items

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let col = MemoryCollection::from_vec(vec![1,2,3usize]);
  assert_eq!(col.run(&GreedyScheduler::new()), Some(vec![1,2,3usize]));
Source

pub fn n_partitions(&self) -> usize

Returns the current number of data partitions

Source

pub fn concat(&self, other: &MemoryCollection<A>) -> MemoryCollection<A>

Concatentates two collections into a single Collection

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let one = MemoryCollection::from_vec(vec![1,2,3usize]);
  let two = MemoryCollection::from_vec(vec![4usize, 5, 6]);
  let cat = one.concat(&two);
  assert_eq!(cat.run(&GreedyScheduler::new()), Some(vec![1,2,3,4,5,6]));
Source

pub fn map<B: Any + Send + Sync + Clone, F: 'static + Sync + Send + Clone + Fn(&A) -> B>( &self, f: F, ) -> MemoryCollection<B>

Maps a function over the values in the DiskCollection, returning a new DiskCollection

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let one = MemoryCollection::from_vec(vec![1,2,3usize]);
  let strings = one.map(|i| format!("{}", i));
  assert_eq!(strings.run(&GreedyScheduler::new()), 
    Some(vec!["1".into(),"2".into(),"3".into()]));
Source

pub fn filter<F: 'static + Sync + Send + Clone + Fn(&A) -> bool>( &self, f: F, ) -> MemoryCollection<A>

Filters out items in the collection that fail the predicate.

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let col = MemoryCollection::from_vec(vec![1,2,3usize]);
  let odds = col.filter(|x| x % 2 == 1);
  assert_eq!(odds.run(&GreedyScheduler::new()), 
    Some(vec![1, 3usize]));
Source

pub fn split(&self, n_chunks: usize) -> MemoryCollection<A>

Re-partitions a collection by the number of provided chunks. It uniformly distributes data from each old partition into each new partition.

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let col = MemoryCollection::from_vec(vec![1,2,3usize]);
  assert_eq!(col.n_partitions(), 1);
  let two = col.split(2);
  assert_eq!(two.n_partitions(), 2);
Source

pub fn emit<B: Any + Send + Sync + Clone, F: 'static + Sync + Send + Clone + Fn(&A, &mut dyn FnMut(B))>( &self, f: F, ) -> MemoryCollection<B>

Maps over all items in a collection, optionally emitting new values. It can be used to efficiently fuse a number of map/filter/flat_map functions into a single method.

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let col = MemoryCollection::from_vec(vec![1,2,3usize]);
  let new = col.emit(|item, emitter| {
    if item % 2 == 0 {
        emitter(format!("{}!", item));
    }
  });
  assert_eq!(new.run(&GreedyScheduler::new()), Some(vec!["2!".into()]));
Source

pub fn emit_to_disk<B: Any + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>, F: 'static + Sync + Send + Clone + Fn(&A, &mut dyn FnMut(B))>( &self, path: String, f: F, ) -> DiskCollection<B>

Maps over all items in a collection, emitting new values. It can be used to efficiently fuse a number of map/filter/flat_map functions into a single method. emit_to_disk differs from the original emit by writing the emitted values directly to disk, returning a DiskCollection instead of MemoryCollection. This makes it convenient to switch to out-of-core when needed.

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let col = MemoryCollection::from_vec(vec![1,2,3usize]);
  let new = col.emit_to_disk("/tmp".into(), |item, emitter| {
    if item % 2 == 0 {
        emitter(format!("{}!", item));
    }
  });
  assert_eq!(new.run(&GreedyScheduler::new()), Some(vec!["2!".into()]));
Source

pub fn partition<F: 'static + Sync + Send + Clone + Fn(usize, &A) -> usize>( &self, partitions: usize, f: F, ) -> MemoryCollection<A>

Re-partitions data into N new partitions by the given function. The user provided function is used as a hash function, mapping the returned value to a partition index. This makes it useful for managing which partition data ends up!

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let col = MemoryCollection::from_vec(vec![1,2,3,4usize]);
  let new_col = col.partition(2, |idx, x| if *x < 3 { 1 } else { 2 });
   
  assert_eq!(new_col.n_partitions(), 2);
  assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![3, 4, 1, 2]));
Source

pub fn fold_by<K: Any + Sync + Send + Clone + Hash + Eq, B: Any + Sync + Send + Clone, D: 'static + Sync + Send + Clone + Fn() -> B, F: 'static + Sync + Send + Clone + Fn(&A) -> K, O: 'static + Sync + Send + Clone + Fn(&mut B, &A), R: 'static + Sync + Send + Clone + Fn(&mut B, &B)>( &self, key: F, default: D, binop: O, reduce: R, partitions: usize, ) -> MemoryCollection<(K, B)>

Folds and accumulates values across multiple partitions into K new partitions. This is also known as a “group by” with a following reducer.

MemoryCollection first performs a block aggregation: that is, it combines values within each partition first using the binop function. It then hashes each key to a new partition index, where it will then aggregate all keys using the reduce function.

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let col = MemoryCollection::from_vec(vec![1,2,3,4,5usize]);
  // Sum all odds and evens together
  let group_sum = col.fold_by(|x| x % 2,
                              || 0usize,
                              |block_acc, item| {*block_acc += *item},
                              |part_acc1, part_acc2| {*part_acc1 += *part_acc2},
                              1)
                  .sort_by(|x| x.0);
   
  assert_eq!(group_sum.n_partitions(), 1);
  assert_eq!(group_sum.run(&GreedyScheduler::new()), Some(vec![(0, 6), (1, 9)]));
Source

pub fn partition_by_key<K: Any + Sync + Send + Clone + Hash + Eq, F: 'static + Sync + Send + Clone + Fn(&A) -> K>( &self, n_chunks: usize, key: F, ) -> MemoryCollection<A>

Simple function to re-partition values by a given key. The return key is hashed and moduloed by the new partition count to determine where it will end up.

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let col = MemoryCollection::from_vec(vec![1,2,3,4usize]);
  let new_col = col.partition_by_key(2, |x| format!("{}", x));
   
  assert_eq!(new_col.n_partitions(), 2);
  assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 1, 2, 3]));
Source

pub fn sort_by<K: Ord, F: 'static + Sync + Send + Clone + Fn(&A) -> K>( &self, key: F, ) -> MemoryCollection<A>

Sorts values within each partition by a key function. If a global sort is desired, the collection needs to be re-partitioned into a single partition

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let col = MemoryCollection::from_vec(vec![1,2,3,4i32]);
  let new_col = col.sort_by(|x| -*x);
   
  assert_eq!(new_col.run(&GreedyScheduler::new()), Some(vec![4, 3, 2, 1]));
Source

pub fn join_on<K: Any + Sync + Send + Clone + Hash + Eq, B: Any + Sync + Send + Clone, C: Any + Sync + Send + Clone, KF1: 'static + Sync + Send + Clone + Fn(&A) -> K, KF2: 'static + Sync + Send + Clone + Fn(&B) -> K, J: 'static + Sync + Send + Clone + Fn(&A, &B) -> C>( &self, other: &MemoryCollection<B>, key1: KF1, key2: KF2, joiner: J, partitions: usize, ) -> MemoryCollection<(K, C)>

Inner Joins two collections by the provided key function. If multiple values of the same key are found, they will be cross product for each pair found.

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;

  let name_age: Vec<(String,u32)> = vec![("Andrew".into(), 33), ("Leah".into(), 12)];
  let name_money: Vec<(String,f32)> = vec![("Leah".into(), 20.50)];
   
  let na = MemoryCollection::from_vec(name_age);
  let nm = MemoryCollection::from_vec(name_money);
  let joined = na.join_on(&nm,
                          |nax| nax.0.clone(),
                          |nmx| nmx.0.clone(),
                          |nax, nmx| (nax.0.clone(), nax.1, nmx.1),
                          1);
  assert_eq!(joined.run(&GreedyScheduler::new()), 
          Some(vec![("Leah".into(), ("Leah".into(), 12, 20.50))]));
Source

pub fn run<S: Scheduler>(&self, s: &S) -> Option<Vec<A>>

Executes the Collection, returning the result of the computation

Source§

impl<A: Any + Send + Sync + Clone> MemoryCollection<Vec<A>>

Source

pub fn flatten(&self) -> MemoryCollection<A>

Flattens a vector of values

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let col = MemoryCollection::from_vec(vec![vec![1usize,2],vec![3,4]]);
  let flattened = col.flatten();
  assert_eq!(flattened.run(&GreedyScheduler::new()), Some(vec![1, 2, 3, 4]));
Source§

impl<A: Any + Send + Sync + Clone> MemoryCollection<A>

Source

pub fn count(&self) -> MemoryCollection<usize>

Returns the number of items in the collection.

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let col = MemoryCollection::from_vec(vec![vec![1usize,2],vec![3,4]]);
  assert_eq!(col.count().run(&GreedyScheduler::new()), Some(vec![2]));
  let flattened = col.flatten();
  assert_eq!(flattened.count().run(&GreedyScheduler::new()), Some(vec![4]));
Source§

impl<A: Any + Send + Sync + Clone + PartialEq + Hash + Eq> MemoryCollection<A>

Source

pub fn frequencies(&self, partitions: usize) -> MemoryCollection<(A, usize)>

Computes the frequencies of the items in collection.

  extern crate tange;
  extern crate tange_collection;
  use tange::scheduler::GreedyScheduler;
  use tange_collection::collection::memory::MemoryCollection;
   
  let col = MemoryCollection::from_vec(vec![1, 2, 1, 5, 1, 2]);
  let freqs = col.frequencies(1).sort_by(|x| x.0);
  assert_eq!(freqs.run(&GreedyScheduler::new()), Some(vec![(1, 3), (2, 2), (5, 1)]));
Source§

impl MemoryCollection<String>

Source

pub fn sink(&self, path: &str) -> MemoryCollection<usize>

Writes each record in a collection to disk, newline delimited. MemoryCollection will create a new file within the path for each partition.

Source§

impl<A: Any + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de>> MemoryCollection<A>

Source

pub fn to_disk(&self, path: String) -> DiskCollection<A>

Copies the MemoryCollection to disk, returning a DiskCollection

Trait Implementations§

Source§

impl<A: Clone> Clone for MemoryCollection<A>

Source§

fn clone(&self) -> MemoryCollection<A>

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

§

impl<A> Freeze for MemoryCollection<A>

§

impl<A> !RefUnwindSafe for MemoryCollection<A>

§

impl<A> Send for MemoryCollection<A>
where A: Send,

§

impl<A> Sync for MemoryCollection<A>
where A: Sync,

§

impl<A> Unpin for MemoryCollection<A>
where A: Unpin,

§

impl<A> UnsafeUnpin for MemoryCollection<A>

§

impl<A> !UnwindSafe for MemoryCollection<A>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.