Skip to main content

rspack_parallel/
lib.rs

1mod iterator_consumer;
2pub mod scope;
3
4use std::{
5  cell::UnsafeCell,
6  future::Future,
7  iter::ExactSizeIterator,
8  mem::{ManuallyDrop, MaybeUninit},
9};
10
11pub use iterator_consumer::{FutureConsumer, RayonConsumer, TryFutureConsumer};
12pub use scope::scope;
13
14/// par_iter then collect into vec
15///
16/// This is a wrapper around `scope`, but allows non-'static return values.
17///
18/// # Safety
19///
20/// Its safety assumptions are the same as `scope`.
21///
22/// # Example
23///
24/// ```rust
25/// # #[tokio::test]
26/// # async fn foo() {
27/// async fn handle(s: &str) -> (usize, &str) {
28///   (s.len(), s)
29/// }
30///
31/// let data: Vec<String> = vec!["hello".into(), "world".into(), "!".into()];
32/// let tasks = data.iter().map(|s| handle(s));
33///
34/// let list = unsafe { par_iter_then_collect(tasks) };
35///
36/// assert_eq!(list, vec![(5, "hello"), (5, "world"), (1, "!")]);
37/// # }
38/// ```
39pub async unsafe fn par_iter_then_collect<I, F, O>(iter: I) -> Vec<O>
40where
41  I: IntoIterator<Item = F>,
42  I::IntoIter: ExactSizeIterator,
43  F: Future<Output = O> + Send + Sync,
44  O: Send + Sync,
45{
46  // TODO use `std::cell::SyncUnsafeCell`
47  //
48  // see https://github.com/rust-lang/rust/issues/95439
49  #[repr(transparent)]
50  struct SyncUnsafeCell<T: ?Sized>(UnsafeCell<T>);
51
52  // # Safety
53  //
54  // We guarantee that `SyncUnsafeCell` will never be accesse parallel
55  unsafe impl<T: ?Sized + Sync> Sync for SyncUnsafeCell<T> {}
56
57  let iter = iter.into_iter();
58  let output: Box<[MaybeUninit<SyncUnsafeCell<O>>]> = Box::new_uninit_slice(iter.len());
59
60  scope(|token| {
61    for (i, f) in iter.enumerate() {
62      // # Safety
63      //
64      // The caller needs to ensure that the task is legally consumed
65      let spawner = unsafe { token.used((f, &output)) };
66
67      spawner.spawn(move |(f, output)| async move {
68        let result = f.await;
69
70        // # Safety
71        //
72        // This assumes that the length provided by the `ExactSizeIterator` is correct,
73        // and will abort if it is not.
74        let slot = &output[i];
75
76        // # Safety
77        //
78        // because transparent repr
79        let slot = slot.as_ptr().cast::<UnsafeCell<O>>();
80
81        // # Safety
82        //
83        // This slot is exclusive to the thread and
84        // will not be accessed by other threads at the same time.
85        unsafe {
86          UnsafeCell::raw_get(slot).write(result);
87        }
88      });
89    }
90  })
91  .await;
92
93  // # Safety
94  //
95  // `scope` ensures that all slots are initialized after completion
96  let output = unsafe { output.assume_init() };
97  let output = Vec::from(output);
98
99  unsafe {
100    // TODO use into_raw_parts
101    //
102    // see https://github.com/rust-lang/rust/issues/65816
103    let mut output = ManuallyDrop::new(output);
104    let ptr = output.as_mut_ptr();
105    let len = output.len();
106    let cap = output.capacity();
107
108    // # Safety
109    //
110    // because transparent repr
111    let ptr = ptr.cast::<O>();
112    Vec::from_raw_parts(ptr, len, cap)
113  }
114}