vec_parallel/lib.rs
1// SPDX-License-Identifier: MIT OR Apache-2.0
2/*!
3
4
5
6A library for building vectors in parallel using async tasks.
7
8This crate provides an efficient, executor-agnostic way to construct `Vec<T>` by dividing
9the work into multiple async tasks that can run concurrently. It's particularly useful for
10CPU-bound initialization tasks where elements can be computed independently.
11
12# Overview
13
14vec_parallel allows you to parallelize the construction of vectors by splitting the work
15across multiple async tasks. Each task is responsible for computing a portion of the vector,
16writing directly to the final memory location to avoid unnecessary copies.
17
18# Key Features
19
20- **Flexible parallelization strategies**: Control task creation with [`Strategy`]
21- **Zero-copy construction**: Elements are written directly to their final location
22- **Executor-agnostic**: Works with any async runtime (tokio, async-std, smol, etc.)
23- **Optional executor integration**: Use the `some_executor` feature for convenient spawning
24- **WASM support**: Works in browser environments with wasm-bindgen
25- **Safe abstraction**: Careful use of unsafe code with documented invariants
26
27# Usage Patterns
28
29## Basic Usage
30
31```
32use vec_parallel::{build_vec, Strategy};
33
34// Build a vector of squares using multiple tasks
35let builder = build_vec(100, Strategy::TasksPerCore(4), |i| i * i);
36
37// Run the tasks (in a real application, these would be spawned on an executor)
38for task in builder.tasks {
39 test_executors::spin_on(task);
40}
41
42// Get the final result
43let squares = test_executors::spin_on(builder.result);
44assert_eq!(squares[10], 100); // 10² = 100
45```
46
47## With Async Executors
48
49```
50use vec_parallel::{build_vec, Strategy};
51
52# async fn example() {
53// With tokio (or any async runtime)
54let builder = build_vec(1000, Strategy::TasksPerCore(4), |i| {
55 // Expensive computation
56 (0..100).map(|j| (i + j) * 2).sum::<usize>()
57});
58
59// Spawn tasks on your executor
60for task in builder.tasks {
61 // In a real app: tokio::spawn(task);
62 test_executors::spin_on(task);
63}
64
65// Await the result
66let result = test_executors::spin_on(builder.result);
67assert_eq!(result.len(), 1000);
68# }
69# test_executors::spin_on(example());
70```
71
72## Choosing a Strategy
73
74The [`Strategy`] enum controls how work is divided:
75
76- [`Strategy::One`]: No parallelism, single task
77- [`Strategy::Tasks`]: Exactly `n` tasks
78- [`Strategy::Max`]: One task per element (maximum parallelism)
79- [`Strategy::TasksPerCore`]: `n` tasks per CPU core (recommended for CPU-bound work)
80
81# Performance Considerations
82
83- For CPU-bound work, use `Strategy::TasksPerCore(4)` to `Strategy::TasksPerCore(8)`
84- For I/O-bound work, consider higher task counts
85- For small vectors (<100 elements), parallelization overhead may not be worth it
86- The library uses atomic operations for synchronization, avoiding locks
87
88# Safety
89
90This library uses `unsafe` code internally for performance, but maintains safety through:
91
92- Non-overlapping slice assignments for each task
93- Atomic counters for task completion tracking
94- Careful lifetime management with `Arc` and `Weak` references
95- All unsafe operations are documented with their safety invariants
96
97# Optional Features
98
99- `some_executor`: Enables integration with the `some_executor` crate for convenient task spawning
100*/
101
102use atomic_waker::AtomicWaker;
103/// Re-export the some_executor crate when the feature is enabled.
104///
105/// This provides access to executor utilities for convenient task spawning.
106#[cfg(feature = "some_executor")]
107pub use some_executor;
108use std::future::Future;
109use std::mem::MaybeUninit;
110use std::pin::Pin;
111use std::sync::atomic::AtomicUsize;
112use std::sync::{Arc, Weak};
113use std::task::{Context, Poll};
114
115/// Determines how work is divided among parallel tasks.
116///
117/// The strategy controls how many tasks are created to build the vector.
118/// Different strategies are optimal for different workloads.
119///
120/// # Examples
121///
122/// ```
123/// use vec_parallel::{build_vec, Strategy};
124///
125/// // Use a single task (no parallelism)
126/// let builder = build_vec(10, Strategy::One, |i| i * 2);
127/// # for task in builder.tasks { test_executors::spin_on(task); }
128/// # let result = test_executors::spin_on(builder.result);
129///
130/// // Use exactly 4 tasks
131/// let builder = build_vec(100, Strategy::Tasks(4), |i| i * 2);
132/// # for task in builder.tasks { test_executors::spin_on(task); }
133/// # let result = test_executors::spin_on(builder.result);
134///
135/// // Create one task per element (maximum parallelism)
136/// let builder = build_vec(10, Strategy::Max, |i| i * 2);
137/// # for task in builder.tasks { test_executors::spin_on(task); }
138/// # let result = test_executors::spin_on(builder.result);
139///
140/// // Create 4 tasks per CPU core
141/// let builder = build_vec(1000, Strategy::TasksPerCore(4), |i| i * 2);
142/// # for task in builder.tasks { test_executors::spin_on(task); }
143/// # let result = test_executors::spin_on(builder.result);
144/// ```
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
146#[non_exhaustive]
147pub enum Strategy {
148 /// Creates a single task to build the entire vector.
149 ///
150 /// This effectively disables parallelism and is equivalent to
151 /// building the vector sequentially.
152 One,
153 /// Creates exactly the specified number of tasks.
154 ///
155 /// The work is divided evenly among the tasks. If the number of tasks
156 /// exceeds the number of elements, it is automatically reduced.
157 ///
158 /// # Example
159 ///
160 /// ```
161 /// # use vec_parallel::{build_vec, Strategy};
162 /// // Create 4 tasks to build a vector of 100 elements
163 /// // Each task will handle 25 elements
164 /// let builder = build_vec(100, Strategy::Tasks(4), |i| i);
165 /// assert_eq!(builder.tasks.len(), 4);
166 ///
167 /// // Run all tasks and verify the result
168 /// for task in builder.tasks {
169 /// test_executors::spin_on(task);
170 /// }
171 /// let result = test_executors::spin_on(builder.result);
172 /// assert_eq!(result.len(), 100);
173 /// assert_eq!(result[0], 0);
174 /// assert_eq!(result[99], 99);
175 /// ```
176 Tasks(usize),
177 /// Creates one task per element (maximum parallelism).
178 ///
179 /// This strategy provides the finest granularity but may have
180 /// higher overhead for simple computations.
181 ///
182 /// # Example
183 ///
184 /// ```
185 /// # use vec_parallel::{build_vec, Strategy};
186 /// // Create 10 tasks for 10 elements
187 /// let builder = build_vec(10, Strategy::Max, |i| i);
188 /// assert_eq!(builder.tasks.len(), 10);
189 /// ```
190 Max,
191 /// Creates a number of tasks based on the CPU core count.
192 ///
193 /// The total number of tasks is `cores * multiplier`. This is ideal
194 /// for CPU-bound workloads. Common multipliers are 4-8 for balanced
195 /// performance.
196 ///
197 /// # Example
198 ///
199 /// ```
200 /// # use vec_parallel::{build_vec, Strategy};
201 /// // On a 4-core system, this creates 16 tasks
202 /// let builder = build_vec(1000, Strategy::TasksPerCore(4), |i| i);
203 /// let expected_tasks = 4 * num_cpus::get();
204 /// assert_eq!(builder.tasks.len(), expected_tasks);
205 ///
206 /// // Run all tasks and verify work distribution
207 /// for task in builder.tasks {
208 /// test_executors::spin_on(task);
209 /// }
210 /// let result = test_executors::spin_on(builder.result);
211 /// assert_eq!(result.len(), 1000);
212 /// // Verify all elements were computed correctly
213 /// for (i, &val) in result.iter().enumerate() {
214 /// assert_eq!(val, i);
215 /// }
216 /// ```
217 TasksPerCore(usize),
218}
219
220/// Shared state for coordinating task completion.
221///
222/// This structure is shared among all [`SliceTask`]s and the [`VecResult`]
223/// to track when all tasks have completed.
224#[derive(Debug)]
225struct SharedWaker {
226 /// Number of tasks that haven't completed yet.
227 outstanding_tasks: AtomicUsize,
228 /// Waker to notify when all tasks complete.
229 waker: AtomicWaker,
230}
231
232/// A future that builds a slice of the final vector.
233///
234/// Each `SliceTask` is responsible for computing and storing elements
235/// for a specific range of indices. Multiple tasks can run concurrently
236/// to build different parts of the vector in parallel.
237///
238/// # Example
239///
240/// ```
241/// use vec_parallel::{build_vec, Strategy};
242///
243/// let builder = build_vec(20, Strategy::Tasks(2), |i| i * 3);
244///
245/// // Each task handles a portion of the vector
246/// assert_eq!(builder.tasks.len(), 2);
247///
248/// // Tasks can be polled independently
249/// for mut task in builder.tasks {
250/// test_executors::spin_on(task);
251/// }
252/// ```
253#[derive(Debug)]
254#[must_use = "futures do nothing unless you `.await` or poll them"]
255pub struct SliceTask<T, B> {
256 /// A function that builds the value at each index in the task's range.
257 build: B,
258 /// A weak reference to the shared vector storage.
259 /// This ensures the slice is not deallocated while the task is running.
260 own: Weak<Vec<MaybeUninit<T>>>,
261 /// The starting index (inclusive) for this task's range.
262 start: usize,
263 /// The ending index (exclusive) for this task's range.
264 past_end: usize,
265 /// Shared synchronization state with other tasks and the result future.
266 shared_waker: Arc<SharedWaker>,
267 /// Flag to prevent double execution of the task.
268 poison: bool,
269}
270
271/// `SliceTask` is `Send` when both `T` and `B` are `Send`.
272///
273/// # Safety
274///
275/// This is safe because:
276/// - The `Weak<Vec<MaybeUninit<T>>>` only writes to non-overlapping ranges
277/// - The `SharedWaker` uses atomic operations for synchronization
278/// - The builder function `B` is moved into the task
279unsafe impl<T, B> Send for SliceTask<T, B>
280where
281 T: Send,
282 B: Send,
283{
284}
285
286impl<T, B> SliceTask<T, B>
287where
288 B: FnMut(usize) -> T,
289{
290 /// Executes this task synchronously, building all elements in its range.
291 ///
292 /// This method is an alternative to polling the future and is useful when
293 /// you want to run tasks on specific threads or in a blocking context.
294 ///
295 /// # Panics
296 ///
297 /// Panics if called after the task has already completed.
298 ///
299 /// # Example
300 ///
301 /// ```
302 /// use vec_parallel::{build_vec, Strategy};
303 ///
304 /// let mut builder = build_vec(10, Strategy::One, |i| i * 2);
305 ///
306 /// // Run the task manually
307 /// builder.tasks[0].run();
308 ///
309 /// let result = test_executors::spin_on(builder.result);
310 /// assert_eq!(result, vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
311 /// ```
312 pub fn run(&mut self) {
313 assert!(!self.poison, "Polling after completion");
314 let own = self.own.upgrade().expect("SliceTask was deallocated");
315 for i in self.start..self.past_end {
316 unsafe {
317 let ptr = own.as_ptr();
318 // assuming our slices are nonoverlapping, we can write to the slice
319 let ptr = ptr as *mut MaybeUninit<T>;
320 ptr.add(i).write(MaybeUninit::new((self.build)(i)));
321 }
322 }
323 let old = self
324 .shared_waker
325 .outstanding_tasks
326 .fetch_sub(1, std::sync::atomic::Ordering::Release);
327 if old == 1 {
328 self.shared_waker.waker.wake();
329 }
330 self.poison = true;
331 }
332
333 /// Helper function to run the task in a depinned state.
334 ///
335 /// # Safety
336 ///
337 /// This function is unsafe because:
338 /// - It assumes `vec_base` points to valid, allocated memory
339 /// - It assumes no other tasks write to the range `[start, past_end)`
340 /// - It assumes the vec won't be deallocated during execution
341 /// - Caller must ensure `poison` is set to prevent double execution
342 unsafe fn run_depinned(
343 start: usize,
344 past_end: usize,
345 build: &mut B,
346 vec_base: *mut MaybeUninit<T>,
347 shared_waker: &Arc<SharedWaker>,
348 poison: &mut bool,
349 ) {
350 for i in start..past_end {
351 unsafe {
352 // assuming our slices are nonoverlapping, we can write to the slice
353 let ptr = vec_base.add(i);
354 ptr.write(MaybeUninit::new(build(i)));
355 }
356 }
357 let old = shared_waker
358 .outstanding_tasks
359 .fetch_sub(1, std::sync::atomic::Ordering::Release);
360 if old == 1 {
361 shared_waker.waker.wake();
362 }
363 *poison = true;
364 }
365}
366
367impl<T, B> Future for SliceTask<T, B>
368where
369 B: FnMut(usize) -> T,
370{
371 type Output = ();
372
373 /// Polls the task to completion.
374 ///
375 /// This method computes all elements in the task's assigned range and
376 /// writes them directly to the shared vector. Once complete, it decrements
377 /// the outstanding task counter and wakes the result future if this was
378 /// the last task.
379 ///
380 /// # Panics
381 ///
382 /// Panics if polled after completion (poison flag is set).
383 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
384 assert!(!self.poison, "Polling after completion");
385 //pin-project
386 let (start, past_end, build, weak_own, shared_waker, poison) = unsafe {
387 let s = self.get_unchecked_mut();
388 let start = &mut s.start;
389 let past_end = &mut s.past_end;
390 let build = &mut s.build;
391 let weak_own = &mut s.own;
392 let shared_waker = &mut s.shared_waker;
393 (
394 start,
395 past_end,
396 build,
397 weak_own,
398 shared_waker,
399 &mut s.poison,
400 )
401 };
402 unsafe {
403 let safe_arc = weak_own.upgrade().expect("SliceTask was deallocated");
404 // assuming our slices are nonoverlapping, we can write to the slice
405 let ptr = safe_arc.as_ptr() as *mut MaybeUninit<T>;
406 Self::run_depinned(*start, *past_end, build, ptr, shared_waker, poison);
407 }
408 Poll::Ready(())
409 }
410}
411
412/// A future that resolves to the completed vector.
413///
414/// This future waits for all [`SliceTask`]s to complete and then assembles
415/// the final vector. It uses atomic operations to track task completion
416/// without requiring locks.
417///
418/// # Example
419///
420/// ```
421/// use vec_parallel::{build_vec, Strategy};
422///
423/// let builder = build_vec(5, Strategy::Max, |i| format!("Item {}", i));
424///
425/// // Complete all tasks
426/// for task in builder.tasks {
427/// test_executors::spin_on(task);
428/// }
429///
430/// // Get the final vector
431/// let result = test_executors::spin_on(builder.result);
432/// assert_eq!(result[0], "Item 0");
433/// assert_eq!(result[4], "Item 4");
434/// ```
435#[derive(Debug)]
436#[must_use = "futures do nothing unless you `.await` or poll them"]
437pub struct VecResult<I> {
438 vec: Option<Arc<Vec<MaybeUninit<I>>>>,
439 shared_waker: Arc<SharedWaker>,
440}
441impl<I> Future for VecResult<I> {
442 type Output = Vec<I>;
443
444 /// Polls the result future, returning the completed vector when all tasks finish.
445 ///
446 /// This method checks if all tasks have completed (outstanding_tasks == 0).
447 /// When they have, it converts the `Vec<MaybeUninit<I>>` to `Vec<I>` and
448 /// returns it. Otherwise, it registers the waker and returns `Pending`.
449 ///
450 /// # Safety
451 ///
452 /// The conversion from `MaybeUninit<I>` to `I` is safe because we only
453 /// perform it after all tasks have completed, guaranteeing all elements
454 /// are initialized.
455 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
456 self.shared_waker.waker.register(cx.waker());
457 if self
458 .shared_waker
459 .outstanding_tasks
460 .load(std::sync::atomic::Ordering::Acquire)
461 == 0
462 {
463 let v = self.vec.take().expect("Polling after completion");
464 let mut v = Arc::into_inner(v).expect("Too many references");
465 let ptr = v.as_mut_ptr() as *mut I;
466 let len = v.len();
467 let cap = v.capacity();
468 // SAFETY:
469 // 1. We got ptr/len/cap from a valid Vec
470 // 2. MaybeUninit<T> and T have the same size and alignment
471 // 3. We check for double-free
472 let f = unsafe {
473 let f = Vec::from_raw_parts(ptr, len, cap);
474 std::mem::forget(v);
475 f
476 };
477 Poll::Ready(f)
478 } else {
479 Poll::Pending
480 }
481 }
482}
483
484/// Contains the tasks and result future for building a vector in parallel.
485///
486/// A `VecBuilder` is created by [`build_vec`] and contains:
487/// - `tasks`: Individual futures that build portions of the vector
488/// - `result`: A future that resolves to the completed vector
489///
490/// # Usage Pattern
491///
492/// 1. Create a builder with [`build_vec`]
493/// 2. Spawn or poll the tasks (potentially on different threads)
494/// 3. Await the result to get the final vector
495///
496/// # Example
497///
498/// ```
499/// use vec_parallel::{build_vec, Strategy};
500///
501/// async fn build_squares() -> Vec<u32> {
502/// let builder = build_vec(10, Strategy::TasksPerCore(2), |i| {
503/// // Simulate expensive computation
504/// (i as u32) * (i as u32)
505/// });
506///
507/// // In a real application, spawn tasks on your executor
508/// for task in builder.tasks {
509/// // tokio::spawn(task);
510/// # test_executors::spin_on(task);
511/// }
512///
513/// // Wait for completion
514/// # test_executors::spin_on(builder.result)
515/// // builder.result.await
516/// }
517/// # test_executors::spin_on(build_squares());
518/// ```
519#[derive(Debug)]
520pub struct VecBuilder<I, B> {
521 /// The individual tasks that build portions of the vector.
522 ///
523 /// These can be spawned on an executor or polled manually.
524 pub tasks: Vec<SliceTask<I, B>>,
525 /// A future that resolves to the completed vector.
526 ///
527 /// This should be awaited after all tasks have been spawned.
528 pub result: VecResult<I>,
529}
530
531/// Re-export of `some_executor::hint::Hint` for convenient access.
532///
533/// Available when the `some_executor` feature is enabled.
534#[cfg(feature = "some_executor")]
535pub type Hint = some_executor::hint::Hint;
536
537/// Re-export of `some_executor::Priority` for convenient access.
538///
539/// Available when the `some_executor` feature is enabled.
540#[cfg(feature = "some_executor")]
541pub type Priority = some_executor::Priority;
542
543#[cfg(feature = "some_executor")]
544impl<I, B> VecBuilder<I, B> {
545 /// Spawns all tasks on the provided executor and awaits the result.
546 ///
547 /// This is a convenience method that handles task spawning and result
548 /// awaiting in one call. The tasks are spawned with the specified
549 /// priority and hint.
550 ///
551 /// # Arguments
552 ///
553 /// * `executor` - The executor to spawn tasks on
554 /// * `priority` - Task priority for scheduling
555 /// * `hint` - Execution hint for the executor
556 ///
557 /// # Example
558 ///
559 /// ```
560 /// # #[cfg(feature = "some_executor")]
561 /// # {
562 /// use vec_parallel::{build_vec, Strategy};
563 ///
564 /// # // Mock executor setup for testing
565 /// # use std::future::Future;
566 /// # use std::pin::Pin;
567 /// # use std::task::{Context, Poll, Waker};
568 /// # use std::sync::{Arc, Mutex};
569 /// # struct MockExecutor {
570 /// # tasks: Arc<Mutex<Vec<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
571 /// # }
572 /// # impl MockExecutor {
573 /// # fn new() -> Self {
574 /// # Self { tasks: Arc::new(Mutex::new(Vec::new())) }
575 /// # }
576 /// # fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
577 /// # self.tasks.lock().unwrap().push(Box::pin(future));
578 /// # }
579 /// # }
580 ///
581 /// # async fn example() {
582 /// // Example with a real executor that implements SomeExecutor
583 /// // In practice, you would use your async runtime's spawning mechanism
584 ///
585 /// let builder = build_vec(100, Strategy::TasksPerCore(4), |i| i * i);
586 ///
587 /// // Spawn tasks on the current executor
588 /// // This example shows how you would integrate with tokio:
589 /// /*
590 /// for task in builder.tasks {
591 /// tokio::spawn(task);
592 /// }
593 /// let squares = builder.result.await;
594 /// */
595 ///
596 /// // Or with the spawn_on method when using some_executor feature:
597 /// // let squares = builder.spawn_on(
598 /// // &mut executor,
599 /// // some_executor::Priority::default(),
600 /// // some_executor::hint::Hint::default()
601 /// // ).await;
602 ///
603 /// # // For testing, run tasks synchronously
604 /// # let mut builder = build_vec(100, Strategy::TasksPerCore(4), |i| i * i);
605 /// # for task in builder.tasks {
606 /// # test_executors::spin_on(task);
607 /// # }
608 /// # let squares = test_executors::spin_on(builder.result);
609 /// # assert_eq!(squares[10], 100);
610 /// # }
611 /// # test_executors::spin_on(example());
612 /// # }
613 /// ```
614 pub async fn spawn_on<E: some_executor::SomeExecutor>(
615 mut self,
616 executor: &mut E,
617 priority: some_executor::Priority,
618 hint: some_executor::hint::Hint,
619 ) -> Vec<I>
620 where
621 I: Send,
622 B: FnMut(usize) -> I,
623 B: Send,
624 I: 'static,
625 B: 'static,
626 {
627 use some_executor::task::{ConfigurationBuilder, Task};
628
629 let configuration = ConfigurationBuilder::new()
630 .priority(priority)
631 .hint(hint)
632 .build();
633
634 let mut observers = Vec::with_capacity(self.tasks.len());
635 for (t, task) in self.tasks.drain(..).enumerate() {
636 let label = format!("VecBuilder task {}", t);
637 let t = Task::without_notifications(label, configuration.clone(), task);
638 let o = executor.spawn(t);
639 observers.push(o);
640 }
641 self.result.await
642 }
643}
644
645/// Creates a builder for constructing a vector in parallel.
646///
647/// This function divides the work of building a vector into multiple async tasks
648/// based on the specified strategy. Each task computes elements for a range of
649/// indices using the provided closure.
650///
651/// # Arguments
652///
653/// * `len` - The length of the vector to create
654/// * `strategy` - How to divide the work among tasks (see [`Strategy`])
655/// * `f` - A closure that computes the element at a given index
656///
657/// # Type Parameters
658///
659/// * `R` - The element type of the resulting vector
660/// * `B` - The closure type (must be `FnMut(usize) -> R` and `Clone`)
661///
662/// # Implementation Details
663///
664/// The function pre-allocates a `Vec<MaybeUninit<R>>` of the requested size,
665/// wraps it in an `Arc` for sharing among tasks, and creates `SliceTask`s
666/// that each handle a non-overlapping range of indices. Task completion is
667/// tracked via atomic operations.
668///
669/// # Examples
670///
671/// ## Basic usage
672///
673/// ```
674/// use vec_parallel::{build_vec, Strategy};
675///
676/// // Build a vector of squares
677/// let builder = build_vec(10, Strategy::Tasks(2), |i| i * i);
678///
679/// // Execute tasks
680/// for task in builder.tasks {
681/// test_executors::spin_on(task);
682/// }
683///
684/// // Get result
685/// let squares = test_executors::spin_on(builder.result);
686/// assert_eq!(squares, vec![0, 1, 4, 9, 16, 25, 36, 49, 64, 81]);
687/// ```
688///
689/// ## Complex computation with shared state
690///
691/// ```
692/// use vec_parallel::{build_vec, Strategy};
693/// use std::sync::Arc;
694///
695/// // Shared configuration for all tasks
696/// let config = Arc::new(vec![1.0, 2.0, 3.0, 4.0]);
697/// let config_clone = config.clone();
698///
699/// let builder = build_vec(8, Strategy::Tasks(2), move |i| {
700/// // Each task gets its own clone of the Arc
701/// let sum: f64 = config_clone.iter().map(|&x| x * (i as f64)).sum();
702/// sum
703/// });
704///
705/// for task in builder.tasks {
706/// test_executors::spin_on(task);
707/// }
708///
709/// let result = test_executors::spin_on(builder.result);
710/// assert_eq!(result[0], 0.0); // 0 * (1+2+3+4)
711/// assert_eq!(result[1], 10.0); // 1 * (1+2+3+4)
712/// ```
713///
714/// ## With expensive computation
715///
716/// ```
717/// use vec_parallel::{build_vec, Strategy};
718///
719/// fn expensive_computation(n: usize) -> u64 {
720/// // Simulate expensive work
721/// (0..1000).map(|i| (n + i) as u64).sum()
722/// }
723///
724/// let builder = build_vec(100, Strategy::TasksPerCore(4), expensive_computation);
725///
726/// // Track which tasks complete first
727/// let mut completed = 0;
728/// for mut task in builder.tasks {
729/// task.run();
730/// completed += 1;
731/// }
732/// assert_eq!(completed, 4 * num_cpus::get());
733///
734/// let result = test_executors::spin_on(builder.result);
735/// assert_eq!(result.len(), 100);
736/// assert_eq!(result[0], expensive_computation(0));
737/// ```
738///
739/// ## Stateful closures
740///
741/// ```
742/// use vec_parallel::{build_vec, Strategy};
743///
744/// let offset = 100;
745/// let builder = build_vec(5, Strategy::Max, move |i| i + offset);
746///
747/// # for task in builder.tasks { test_executors::spin_on(task); }
748/// let result = test_executors::spin_on(builder.result);
749/// assert_eq!(result, vec![100, 101, 102, 103, 104]);
750/// ```
751/// # Returns
752///
753/// A [`VecBuilder`] containing the tasks to execute and a result future.
754///
755/// # Panics
756///
757/// This function does not panic. If more tasks are requested than elements,
758/// the task count is automatically reduced.
759pub fn build_vec<R, B>(len: usize, strategy: Strategy, f: B) -> VecBuilder<R, B>
760where
761 B: FnMut(usize) -> R,
762 B: Clone,
763{
764 let mut vec = Vec::with_capacity(len);
765 vec.resize_with(len, MaybeUninit::uninit);
766
767 let vec_arc = Arc::new(vec);
768 match strategy {
769 Strategy::One => build_vec(len, Strategy::Tasks(1), f),
770 Strategy::Tasks(tasks) => {
771 if tasks > len {
772 return build_vec(len, Strategy::Tasks(len), f);
773 }
774 let shared_waker = Arc::new(SharedWaker {
775 outstanding_tasks: AtomicUsize::new(tasks),
776 waker: AtomicWaker::new(),
777 });
778 let mut task_vec = Vec::with_capacity(tasks);
779 let chunk = len / tasks;
780 for i in 0..tasks {
781 let start = i * chunk;
782 let end = if i + 1 == tasks { len } else { start + chunk };
783 let task = SliceTask {
784 build: f.clone(),
785 own: Arc::downgrade(&vec_arc),
786 start,
787 past_end: end,
788 shared_waker: shared_waker.clone(),
789 poison: false,
790 };
791 task_vec.push(task);
792 }
793 let result = VecResult {
794 vec: Some(vec_arc),
795 shared_waker,
796 };
797 VecBuilder {
798 tasks: task_vec,
799 result,
800 }
801 }
802 Strategy::Max => build_vec(len, Strategy::Tasks(len), f),
803 Strategy::TasksPerCore(tasks_per_core) => {
804 let tasks = tasks_per_core * num_cpus::get();
805 build_vec(len, Strategy::Tasks(tasks), f)
806 }
807 }
808}
809
810// Implementation notes for trait implementations:
811//
812// VecBuilder trait implementations
813// Clone: Not implemented. VecBuilder contains mutable tasks with shared synchronization state.
814// Cloning would create confusing semantics around task execution and completion tracking.
815// May be reconsidered in the future if a clear use case emerges.
816//
817// PartialEq/Eq: Not implemented. Each VecBuilder represents a unique parallel computation
818// with distinct synchronization state. Equality comparisons don't have meaningful semantics.
819//
820// Send/Sync: Automatically derived based on generic parameters.
821// VecBuilder is Send when I: Send and B: Send.
822// VecBuilder is Sync when I: Send + Sync and B: Send + Sync.
823// This follows from the Send/Sync properties of its fields (tasks and result).
824//
825// Hash: Not implemented since PartialEq/Eq are not implemented.
826// Default: Not implemented. VecBuilder requires specific parameters (length, strategy, closure).
827// Display: Not implemented. Not typically useful for builder types.
828// From/Into: Not implemented. No obvious conversions to/from other types.
829// AsRef/AsMut: Not implemented. Fields are already public, providing direct access.
830// Deref/DerefMut: Not implemented. VecBuilder is not a wrapper around a single underlying type.
831
832/// Equality comparison for `SliceTask`.
833///
834/// Two tasks are considered equal if they operate on the same range
835/// of the same underlying vector.
836impl<T, B> PartialEq for SliceTask<T, B> {
837 fn eq(&self, other: &Self) -> bool {
838 self.start == other.start
839 && self.past_end == other.past_end
840 && Weak::ptr_eq(&self.own, &other.own)
841 }
842}
843
844impl<T, B> Eq for SliceTask<T, B> {}
845
846/// Hash implementation for `SliceTask`.
847///
848/// Hashes based on the task's range and the underlying vector pointer.
849impl<T, B> std::hash::Hash for SliceTask<T, B> {
850 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
851 self.start.hash(state);
852 self.past_end.hash(state);
853 Weak::as_ptr(&self.own).hash(state);
854 }
855}
856//asref/asmut - sort of hard to implement safely to avoid double-muts.
857
858// VecResult boilerplate
859// Clone: Not implemented. VecResult is a consuming future that takes ownership of the
860// underlying Vec when polled to completion. Cloning would create confusing semantics
861// where multiple futures try to take ownership of the same data.
862//
863// PartialEq/Eq: Not implemented. VecResult is a stateful future with internal synchronization
864// state that gets consumed during polling. Equality comparisons don't have meaningful semantics.
865//
866// Hash: Not implemented since PartialEq/Eq are not implemented.
867//
868// Copy: Not implemented. Contains heap-allocated data via Arc.
869//
870// Default: Not implemented. VecResult requires specific initialization with a vec and shared_waker
871// that coordinate with associated SliceTask instances.
872//
873// Display: Not implemented. Not typically useful for future types.
874//
875// From/Into: Not implemented. No obvious conversions to/from other types.
876//
877// AsRef/AsMut: Not implemented. Internal fields are private implementation details
878// of the future's synchronization mechanism.
879//
880// Deref/DerefMut: Not implemented. VecResult is not a wrapper around a single underlying type.
881//
882// Send/Sync: Automatically derived based on generic parameter I.
883// VecResult is Send when I: Send, and Sync when I: Send + Sync.
884// This follows from the Send/Sync properties of Arc<Vec<MaybeUninit<I>>> and Arc<SharedWaker>.
885
886#[cfg(test)]
887mod tests {
888 use crate::VecResult;
889
890 #[cfg(target_arch = "wasm32")]
891 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
892
893 #[cfg_attr(not(target_arch = "wasm32"), test)]
894 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
895 fn test_build_vec() {
896 let builder = super::build_vec(10, super::Strategy::One, |i: usize| i);
897 for task in builder.tasks {
898 test_executors::spin_on(task);
899 }
900 let o = test_executors::spin_on::<VecResult<_>>(builder.result);
901 assert_eq!(o, (0..10).collect::<Vec<_>>());
902 }
903
904 #[cfg_attr(not(target_arch = "wasm32"), test)]
905 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
906 fn test_build_vec_tasks() {
907 let builder = super::build_vec(10, super::Strategy::Tasks(2), |i: usize| i);
908 assert_eq!(builder.tasks.len(), 2);
909 assert_eq!(builder.tasks[0].start, 0);
910 assert_eq!(builder.tasks[0].past_end, 5);
911 assert_eq!(builder.tasks[1].start, 5);
912 assert_eq!(builder.tasks[1].past_end, 10);
913 }
914
915 #[cfg_attr(not(target_arch = "wasm32"), test)]
916 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
917 fn test_build_vec_tasks_11() {
918 let builder = super::build_vec(13, super::Strategy::Tasks(3), |i: usize| i);
919 assert_eq!(builder.tasks.len(), 3);
920 assert_eq!(builder.tasks[0].start, 0);
921 assert_eq!(builder.tasks[0].past_end, 4);
922 assert_eq!(builder.tasks[1].start, 4);
923 assert_eq!(builder.tasks[1].past_end, 8);
924 assert_eq!(builder.tasks[2].start, 8);
925 assert_eq!(builder.tasks[2].past_end, 13);
926 }
927
928 #[cfg_attr(not(target_arch = "wasm32"), test)]
929 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
930 fn test_build_vec_tasks_103() {
931 let builder = super::build_vec(103, super::Strategy::Tasks(3), |i: usize| i);
932 assert_eq!(builder.tasks.len(), 3);
933 assert_eq!(builder.tasks[0].start, 0);
934 assert_eq!(builder.tasks[0].past_end, 34);
935 assert_eq!(builder.tasks[1].start, 34);
936 assert_eq!(builder.tasks[1].past_end, 68);
937 assert_eq!(builder.tasks[2].start, 68);
938 assert_eq!(builder.tasks[2].past_end, 103);
939 }
940
941 #[cfg_attr(not(target_arch = "wasm32"), test)]
942 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
943 fn test_max() {
944 let builder = super::build_vec(10, super::Strategy::Max, |i: usize| i);
945 assert_eq!(builder.tasks.len(), 10);
946 let mut start = 0;
947 for task in builder.tasks {
948 assert_eq!(task.start, start);
949 assert_eq!(task.past_end, start + 1);
950 start += 1;
951 }
952 }
953
954 #[cfg_attr(not(target_arch = "wasm32"), test)]
955 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
956 fn test_too_many_tasks() {
957 let builder = super::build_vec(10, super::Strategy::Tasks(20), |i: usize| i);
958 assert_eq!(builder.tasks.len(), 10);
959 let mut start = 0;
960 for task in builder.tasks {
961 assert_eq!(task.start, start);
962 assert_eq!(task.past_end, start + 1);
963 start += 1;
964 }
965 }
966
967 #[cfg_attr(not(target_arch = "wasm32"), test)]
968 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
969 fn test_tasks_per_core() {
970 let builder = super::build_vec(1000, super::Strategy::TasksPerCore(2), |i: usize| i);
971 let cpus = num_cpus::get();
972 assert_eq!(builder.tasks.len(), 2 * cpus);
973 let mut start = 0;
974 for task in builder.tasks {
975 assert_eq!(task.start, start);
976 assert!(task.past_end > start);
977 start = task.past_end;
978 }
979 }
980
981 #[test]
982 fn test_send() {
983 fn is_send<T: Send>(_t: &T) {}
984 fn is_static<T: 'static>(_t: &T) {}
985
986 let mut builder = super::build_vec(1000, super::Strategy::TasksPerCore(2), |i: usize| i);
987 let a_task = builder.tasks.remove(0);
988
989 is_send(&a_task);
990 is_static(&a_task);
991
992 let a_result = builder.result;
993 is_send(&a_result);
994 is_static(&a_result);
995 }
996
997 #[cfg(feature = "some_executor")]
998 #[test]
999 fn test_spawn_on() {
1000 let executor = test_executors::aruntime::SpawnRuntime;
1001 some_executor::thread_executor::set_thread_executor(Box::new(executor));
1002 let builder = super::build_vec(10, super::Strategy::Max, |i: usize| i);
1003
1004 some_executor::thread_executor::thread_executor(|e| {
1005 _ = builder.spawn_on(
1006 &mut e.unwrap().clone_box(),
1007 some_executor::Priority::unit_test(),
1008 some_executor::hint::Hint::default(),
1009 );
1010 });
1011 }
1012}