use super::{ParallelIterator, IndexedParallelIterator, IntoParallelIterator, ParallelExtend};
use std::collections::LinkedList;
use std::slice;
use std::sync::atomic::{AtomicUsize, Ordering};
mod consumer;
use self::consumer::CollectConsumer;
use super::unzip::unzip_indexed;
mod test;
pub fn collect_into<I, T>(mut pi: I, v: &mut Vec<T>)
where I: IndexedParallelIterator<Item = T>,
T: Send
{
v.truncate(0); let mut collect = Collect::new(v, pi.len());
pi.drive(collect.as_consumer());
collect.complete();
}
fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>)
where I: ParallelIterator<Item = T>,
T: Send
{
let mut collect = Collect::new(v, len);
pi.drive_unindexed(collect.as_consumer());
collect.complete();
}
pub fn unzip_into<I, A, B>(mut pi: I, left: &mut Vec<A>, right: &mut Vec<B>)
where I: IndexedParallelIterator<Item = (A, B)>,
A: Send,
B: Send
{
left.truncate(0);
right.truncate(0);
let len = pi.len();
let mut left = Collect::new(left, len);
let mut right = Collect::new(right, len);
unzip_indexed(pi, left.as_consumer(), right.as_consumer());
left.complete();
right.complete();
}
struct Collect<'c, T: Send + 'c> {
writes: AtomicUsize,
vec: &'c mut Vec<T>,
len: usize,
}
impl<'c, T: Send + 'c> Collect<'c, T> {
fn new(vec: &'c mut Vec<T>, len: usize) -> Self {
Collect {
writes: AtomicUsize::new(0),
vec: vec,
len: len,
}
}
fn as_consumer(&mut self) -> CollectConsumer<T> {
self.vec.reserve(self.len);
let start = self.vec.len();
let mut slice = &mut self.vec[start..];
slice = unsafe { slice::from_raw_parts_mut(slice.as_mut_ptr(), self.len) };
CollectConsumer::new(&self.writes, slice)
}
fn complete(mut self) {
unsafe {
let actual_writes = self.writes.load(Ordering::Relaxed);
assert!(actual_writes == self.len,
"expected {} total writes, but got {}",
self.len,
actual_writes);
let new_len = self.vec.len() + self.len;
self.vec.set_len(new_len);
}
}
}
impl<T> ParallelExtend<T> for Vec<T>
where T: Send
{
fn par_extend<I>(&mut self, par_iter: I)
where I: IntoParallelIterator<Item = T>
{
let mut par_iter = par_iter.into_par_iter();
match par_iter.opt_len() {
Some(len) => {
special_extend(par_iter, len, self);
}
None => {
let list: LinkedList<_> = par_iter
.fold(Vec::new, |mut vec, elem| {
vec.push(elem);
vec
})
.collect();
self.reserve(list.iter().map(Vec::len).sum());
for mut vec in list {
self.append(&mut vec);
}
}
}
}
}