rspack_parallel/lib.rs
1mod iterator_consumer;
2pub mod scope;
3
4use std::{
5 cell::UnsafeCell,
6 future::Future,
7 iter::ExactSizeIterator,
8 mem::{ManuallyDrop, MaybeUninit},
9};
10
11pub use iterator_consumer::{FutureConsumer, RayonConsumer, TryFutureConsumer};
12pub use scope::scope;
13
14/// par_iter then collect into vec
15///
16/// This is a wrapper around `scope`, but allows non-'static return values.
17///
18/// # Safety
19///
20/// Its safety assumptions are the same as `scope`.
21///
22/// # Example
23///
24/// ```rust
25/// # #[tokio::test]
26/// # async fn foo() {
27/// async fn handle(s: &str) -> (usize, &str) {
28/// (s.len(), s)
29/// }
30///
31/// let data: Vec<String> = vec!["hello".into(), "world".into(), "!".into()];
32/// let tasks = data.iter().map(|s| handle(s));
33///
34/// let list = unsafe { par_iter_then_collect(tasks) };
35///
36/// assert_eq!(list, vec![(5, "hello"), (5, "world"), (1, "!")]);
37/// # }
38/// ```
39pub async unsafe fn par_iter_then_collect<I, F, O>(iter: I) -> Vec<O>
40where
41 I: IntoIterator<Item = F>,
42 I::IntoIter: ExactSizeIterator,
43 F: Future<Output = O> + Send + Sync,
44 O: Send + Sync,
45{
46 // TODO use `std::cell::SyncUnsafeCell`
47 //
48 // see https://github.com/rust-lang/rust/issues/95439
49 #[repr(transparent)]
50 struct SyncUnsafeCell<T: ?Sized>(UnsafeCell<T>);
51
52 // # Safety
53 //
54 // We guarantee that `SyncUnsafeCell` will never be accesse parallel
55 unsafe impl<T: ?Sized + Sync> Sync for SyncUnsafeCell<T> {}
56
57 let iter = iter.into_iter();
58 let output: Box<[MaybeUninit<SyncUnsafeCell<O>>]> = Box::new_uninit_slice(iter.len());
59
60 scope(|token| {
61 for (i, f) in iter.enumerate() {
62 // # Safety
63 //
64 // The caller needs to ensure that the task is legally consumed
65 let spawner = unsafe { token.used((f, &output)) };
66
67 spawner.spawn(move |(f, output)| async move {
68 let result = f.await;
69
70 // # Safety
71 //
72 // This assumes that the length provided by the `ExactSizeIterator` is correct,
73 // and will abort if it is not.
74 let slot = &output[i];
75
76 // # Safety
77 //
78 // because transparent repr
79 let slot = slot.as_ptr().cast::<UnsafeCell<O>>();
80
81 // # Safety
82 //
83 // This slot is exclusive to the thread and
84 // will not be accessed by other threads at the same time.
85 unsafe {
86 UnsafeCell::raw_get(slot).write(result);
87 }
88 });
89 }
90 })
91 .await;
92
93 // # Safety
94 //
95 // `scope` ensures that all slots are initialized after completion
96 let output = unsafe { output.assume_init() };
97 let output = Vec::from(output);
98
99 unsafe {
100 // TODO use into_raw_parts
101 //
102 // see https://github.com/rust-lang/rust/issues/65816
103 let mut output = ManuallyDrop::new(output);
104 let ptr = output.as_mut_ptr();
105 let len = output.len();
106 let cap = output.capacity();
107
108 // # Safety
109 //
110 // because transparent repr
111 let ptr = ptr.cast::<O>();
112 Vec::from_raw_parts(ptr, len, cap)
113 }
114}