use super::{ExactParallelSink, FromExactParallelSink};
use std::collections::VecDeque;
use std::mem::ManuallyDrop;
use std::sync::Mutex;
impl<T: Send> FromExactParallelSink for Vec<T> {
type Item = T;
type Sink = VecParallelSink<T>;
unsafe fn finalize(sink: Self::Sink) -> Self {
debug_assert!(sink.skipped.into_inner().unwrap().is_empty());
let base_ptr: *mut T = sink.ptr.get();
let len = sink.len;
let capacity = sink.capacity;
unsafe { Vec::from_raw_parts(base_ptr, len, capacity) }
}
}
impl<T: Send> FromExactParallelSink for Box<[T]> {
type Item = T;
type Sink = VecParallelSink<T>;
unsafe fn finalize(sink: Self::Sink) -> Self {
let vec: Vec<T> = unsafe { FromExactParallelSink::finalize(sink) };
vec.into_boxed_slice()
}
}
impl<T: Send> FromExactParallelSink for VecDeque<T> {
type Item = T;
type Sink = VecParallelSink<T>;
unsafe fn finalize(sink: Self::Sink) -> Self {
let vec: Vec<T> = unsafe { FromExactParallelSink::finalize(sink) };
vec.into()
}
}
#[must_use = "iterator adaptors are lazy"]
pub struct VecParallelSink<T: Send> {
ptr: MutPtrWrapper<T>,
len: usize,
capacity: usize,
skipped: Mutex<Vec<std::ops::Range<usize>>>,
}
impl<T: Send> ExactParallelSink for VecParallelSink<T> {
type Item = T;
const NEEDS_CLEANUP: bool = std::mem::needs_drop::<T>();
fn new(len: usize) -> Self {
let vec: Vec<T> = Vec::with_capacity(len);
let mut vec = ManuallyDrop::new(vec);
let mut_ptr = vec.as_mut_ptr();
let capacity = vec.capacity();
debug_assert_eq!(vec.len(), 0);
Self {
ptr: MutPtrWrapper(mut_ptr),
len,
capacity,
skipped: Mutex::new(Vec::new()),
}
}
unsafe fn push_item(&self, index: usize, item: Self::Item) {
debug_assert!(index < self.len);
let base_ptr: *mut T = self.ptr.get();
let item_ptr: *mut T = unsafe { base_ptr.add(index) };
unsafe { std::ptr::write(item_ptr, item) };
}
unsafe fn skip_item_range(&self, range: std::ops::Range<usize>) {
if Self::NEEDS_CLEANUP {
debug_assert!(range.start <= range.end);
debug_assert!(range.start <= self.len);
debug_assert!(range.end <= self.len);
self.skipped.lock().unwrap().push(range);
}
}
unsafe fn cancel(self) {
let base_ptr: *mut T = self.ptr.get();
if Self::NEEDS_CLEANUP {
let mut skipped = self.skipped.into_inner().unwrap();
skipped.sort_unstable_by_key(|range| range.start);
let mut prev = 0..0;
for range in skipped.into_iter() {
Self::cleanup_item_range(base_ptr, self.len, prev.end..range.start);
prev = range.clone();
}
Self::cleanup_item_range(base_ptr, self.len, prev.end..self.len);
}
let vec: Vec<T> = unsafe { Vec::from_raw_parts(base_ptr, 0, self.capacity) };
drop(vec);
}
}
impl<T: Send> VecParallelSink<T> {
fn cleanup_item_range(base_ptr: *mut T, len: usize, range: std::ops::Range<usize>) {
if Self::NEEDS_CLEANUP {
debug_assert!(range.start <= range.end);
debug_assert!(range.start <= len);
debug_assert!(range.end <= len);
let start_ptr: *mut T = unsafe { base_ptr.add(range.start) };
let slice: *mut [T] =
std::ptr::slice_from_raw_parts_mut(start_ptr, range.end - range.start);
unsafe { std::ptr::drop_in_place(slice) };
}
}
}
struct MutPtrWrapper<T>(*mut T);
impl<T> MutPtrWrapper<T> {
fn get(&self) -> *mut T {
self.0
}
}
unsafe impl<T: Send> Sync for MutPtrWrapper<T> {}