vec_parallel/
lib.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2/*!
3
4![logo](../../../art/logo.png)
5
6A library for building vectors in parallel using async tasks.
7
8This crate provides an efficient, executor-agnostic way to construct `Vec<T>` by dividing
9the work into multiple async tasks that can run concurrently. It's particularly useful for
10CPU-bound initialization tasks where elements can be computed independently.
11
12# Overview
13
14vec_parallel allows you to parallelize the construction of vectors by splitting the work
15across multiple async tasks. Each task is responsible for computing a portion of the vector,
16writing directly to the final memory location to avoid unnecessary copies.
17
18# Key Features
19
20- **Flexible parallelization strategies**: Control task creation with [`Strategy`]
21- **Zero-copy construction**: Elements are written directly to their final location
22- **Executor-agnostic**: Works with any async runtime (tokio, async-std, smol, etc.)
23- **Optional executor integration**: Use the `some_executor` feature for convenient spawning
24- **WASM support**: Works in browser environments with wasm-bindgen
25- **Safe abstraction**: Careful use of unsafe code with documented invariants
26
27# Usage Patterns
28
29## Basic Usage
30
31```
32use vec_parallel::{build_vec, Strategy};
33
34// Build a vector of squares using multiple tasks
35let builder = build_vec(100, Strategy::TasksPerCore(4), |i| i * i);
36
37// Run the tasks (in a real application, these would be spawned on an executor)
38for task in builder.tasks {
39    test_executors::spin_on(task);
40}
41
42// Get the final result
43let squares = test_executors::spin_on(builder.result);
44assert_eq!(squares[10], 100); // 10² = 100
45```
46
47## With Async Executors
48
49```
50use vec_parallel::{build_vec, Strategy};
51
52# async fn example() {
53// With tokio (or any async runtime)
54let builder = build_vec(1000, Strategy::TasksPerCore(4), |i| {
55    // Expensive computation
56    (0..100).map(|j| (i + j) * 2).sum::<usize>()
57});
58
59// Spawn tasks on your executor
60for task in builder.tasks {
61    // In a real app: tokio::spawn(task);
62    test_executors::spin_on(task);
63}
64
65// Await the result
66let result = test_executors::spin_on(builder.result);
67assert_eq!(result.len(), 1000);
68# }
69# test_executors::spin_on(example());
70```
71
72## Choosing a Strategy
73
74The [`Strategy`] enum controls how work is divided:
75
76- [`Strategy::One`]: No parallelism, single task
77- [`Strategy::Tasks`]: Exactly `n` tasks
78- [`Strategy::Max`]: One task per element (maximum parallelism)
79- [`Strategy::TasksPerCore`]: `n` tasks per CPU core (recommended for CPU-bound work)
80
81# Performance Considerations
82
83- For CPU-bound work, use `Strategy::TasksPerCore(4)` to `Strategy::TasksPerCore(8)`
84- For I/O-bound work, consider higher task counts
85- For small vectors (<100 elements), parallelization overhead may not be worth it
86- The library uses atomic operations for synchronization, avoiding locks
87
88# Safety
89
90This library uses `unsafe` code internally for performance, but maintains safety through:
91
92- Non-overlapping slice assignments for each task
93- Atomic counters for task completion tracking
94- Careful lifetime management with `Arc` and `Weak` references
95- All unsafe operations are documented with their safety invariants
96
97# Optional Features
98
99- `some_executor`: Enables integration with the `some_executor` crate for convenient task spawning
100*/
101
102use atomic_waker::AtomicWaker;
103/// Re-export the some_executor crate when the feature is enabled.
104///
105/// This provides access to executor utilities for convenient task spawning.
106#[cfg(feature = "some_executor")]
107pub use some_executor;
108use std::future::Future;
109use std::mem::MaybeUninit;
110use std::pin::Pin;
111use std::sync::atomic::AtomicUsize;
112use std::sync::{Arc, Weak};
113use std::task::{Context, Poll};
114
115/// Determines how work is divided among parallel tasks.
116///
117/// The strategy controls how many tasks are created to build the vector.
118/// Different strategies are optimal for different workloads.
119///
120/// # Examples
121///
122/// ```
123/// use vec_parallel::{build_vec, Strategy};
124///
125/// // Use a single task (no parallelism)
126/// let builder = build_vec(10, Strategy::One, |i| i * 2);
127/// # for task in builder.tasks { test_executors::spin_on(task); }
128/// # let result = test_executors::spin_on(builder.result);
129///
130/// // Use exactly 4 tasks
131/// let builder = build_vec(100, Strategy::Tasks(4), |i| i * 2);
132/// # for task in builder.tasks { test_executors::spin_on(task); }
133/// # let result = test_executors::spin_on(builder.result);
134///
135/// // Create one task per element (maximum parallelism)
136/// let builder = build_vec(10, Strategy::Max, |i| i * 2);
137/// # for task in builder.tasks { test_executors::spin_on(task); }
138/// # let result = test_executors::spin_on(builder.result);
139///
140/// // Create 4 tasks per CPU core
141/// let builder = build_vec(1000, Strategy::TasksPerCore(4), |i| i * 2);
142/// # for task in builder.tasks { test_executors::spin_on(task); }
143/// # let result = test_executors::spin_on(builder.result);
144/// ```
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
146#[non_exhaustive]
147pub enum Strategy {
148    /// Creates a single task to build the entire vector.
149    ///
150    /// This effectively disables parallelism and is equivalent to
151    /// building the vector sequentially.
152    One,
153    /// Creates exactly the specified number of tasks.
154    ///
155    /// The work is divided evenly among the tasks. If the number of tasks
156    /// exceeds the number of elements, it is automatically reduced.
157    ///
158    /// # Example
159    ///
160    /// ```
161    /// # use vec_parallel::{build_vec, Strategy};
162    /// // Create 4 tasks to build a vector of 100 elements
163    /// // Each task will handle 25 elements
164    /// let builder = build_vec(100, Strategy::Tasks(4), |i| i);
165    /// assert_eq!(builder.tasks.len(), 4);
166    ///
167    /// // Run all tasks and verify the result
168    /// for task in builder.tasks {
169    ///     test_executors::spin_on(task);
170    /// }
171    /// let result = test_executors::spin_on(builder.result);
172    /// assert_eq!(result.len(), 100);
173    /// assert_eq!(result[0], 0);
174    /// assert_eq!(result[99], 99);
175    /// ```
176    Tasks(usize),
177    /// Creates one task per element (maximum parallelism).
178    ///
179    /// This strategy provides the finest granularity but may have
180    /// higher overhead for simple computations.
181    ///
182    /// # Example
183    ///
184    /// ```
185    /// # use vec_parallel::{build_vec, Strategy};
186    /// // Create 10 tasks for 10 elements
187    /// let builder = build_vec(10, Strategy::Max, |i| i);
188    /// assert_eq!(builder.tasks.len(), 10);
189    /// ```
190    Max,
191    /// Creates a number of tasks based on the CPU core count.
192    ///
193    /// The total number of tasks is `cores * multiplier`. This is ideal
194    /// for CPU-bound workloads. Common multipliers are 4-8 for balanced
195    /// performance.
196    ///
197    /// # Example
198    ///
199    /// ```
200    /// # use vec_parallel::{build_vec, Strategy};
201    /// // On a 4-core system, this creates 16 tasks
202    /// let builder = build_vec(1000, Strategy::TasksPerCore(4), |i| i);
203    /// let expected_tasks = 4 * num_cpus::get();
204    /// assert_eq!(builder.tasks.len(), expected_tasks);
205    ///
206    /// // Run all tasks and verify work distribution
207    /// for task in builder.tasks {
208    ///     test_executors::spin_on(task);
209    /// }
210    /// let result = test_executors::spin_on(builder.result);
211    /// assert_eq!(result.len(), 1000);
212    /// // Verify all elements were computed correctly
213    /// for (i, &val) in result.iter().enumerate() {
214    ///     assert_eq!(val, i);
215    /// }
216    /// ```
217    TasksPerCore(usize),
218}
219
220/// Shared state for coordinating task completion.
221///
222/// This structure is shared among all [`SliceTask`]s and the [`VecResult`]
223/// to track when all tasks have completed.
224#[derive(Debug)]
225struct SharedWaker {
226    /// Number of tasks that haven't completed yet.
227    outstanding_tasks: AtomicUsize,
228    /// Waker to notify when all tasks complete.
229    waker: AtomicWaker,
230}
231
232/// A future that builds a slice of the final vector.
233///
234/// Each `SliceTask` is responsible for computing and storing elements
235/// for a specific range of indices. Multiple tasks can run concurrently
236/// to build different parts of the vector in parallel.
237///
238/// # Example
239///
240/// ```
241/// use vec_parallel::{build_vec, Strategy};
242///
243/// let builder = build_vec(20, Strategy::Tasks(2), |i| i * 3);
244///
245/// // Each task handles a portion of the vector
246/// assert_eq!(builder.tasks.len(), 2);
247///
248/// // Tasks can be polled independently
249/// for mut task in builder.tasks {
250///     test_executors::spin_on(task);
251/// }
252/// ```
253#[derive(Debug)]
254#[must_use = "futures do nothing unless you `.await` or poll them"]
255pub struct SliceTask<T, B> {
256    /// A function that builds the value at each index in the task's range.
257    build: B,
258    /// A weak reference to the shared vector storage.
259    /// This ensures the slice is not deallocated while the task is running.
260    own: Weak<Vec<MaybeUninit<T>>>,
261    /// The starting index (inclusive) for this task's range.
262    start: usize,
263    /// The ending index (exclusive) for this task's range.
264    past_end: usize,
265    /// Shared synchronization state with other tasks and the result future.
266    shared_waker: Arc<SharedWaker>,
267    /// Flag to prevent double execution of the task.
268    poison: bool,
269}
270
271/// `SliceTask` is `Send` when both `T` and `B` are `Send`.
272///
273/// # Safety
274///
275/// This is safe because:
276/// - The `Weak<Vec<MaybeUninit<T>>>` only writes to non-overlapping ranges
277/// - The `SharedWaker` uses atomic operations for synchronization
278/// - The builder function `B` is moved into the task
279unsafe impl<T, B> Send for SliceTask<T, B>
280where
281    T: Send,
282    B: Send,
283{
284}
285
286impl<T, B> SliceTask<T, B>
287where
288    B: FnMut(usize) -> T,
289{
290    /// Executes this task synchronously, building all elements in its range.
291    ///
292    /// This method is an alternative to polling the future and is useful when
293    /// you want to run tasks on specific threads or in a blocking context.
294    ///
295    /// # Panics
296    ///
297    /// Panics if called after the task has already completed.
298    ///
299    /// # Example
300    ///
301    /// ```
302    /// use vec_parallel::{build_vec, Strategy};
303    ///
304    /// let mut builder = build_vec(10, Strategy::One, |i| i * 2);
305    ///
306    /// // Run the task manually
307    /// builder.tasks[0].run();
308    ///
309    /// let result = test_executors::spin_on(builder.result);
310    /// assert_eq!(result, vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
311    /// ```
312    pub fn run(&mut self) {
313        assert!(!self.poison, "Polling after completion");
314        let own = self.own.upgrade().expect("SliceTask was deallocated");
315        for i in self.start..self.past_end {
316            unsafe {
317                let ptr = own.as_ptr();
318                // assuming our slices are nonoverlapping, we can write to the slice
319                let ptr = ptr as *mut MaybeUninit<T>;
320                ptr.add(i).write(MaybeUninit::new((self.build)(i)));
321            }
322        }
323        let old = self
324            .shared_waker
325            .outstanding_tasks
326            .fetch_sub(1, std::sync::atomic::Ordering::Release);
327        if old == 1 {
328            self.shared_waker.waker.wake();
329        }
330        self.poison = true;
331    }
332
333    /// Helper function to run the task in a depinned state.
334    ///
335    /// # Safety
336    ///
337    /// This function is unsafe because:
338    /// - It assumes `vec_base` points to valid, allocated memory
339    /// - It assumes no other tasks write to the range `[start, past_end)`
340    /// - It assumes the vec won't be deallocated during execution
341    /// - Caller must ensure `poison` is set to prevent double execution
342    unsafe fn run_depinned(
343        start: usize,
344        past_end: usize,
345        build: &mut B,
346        vec_base: *mut MaybeUninit<T>,
347        shared_waker: &Arc<SharedWaker>,
348        poison: &mut bool,
349    ) {
350        for i in start..past_end {
351            unsafe {
352                // assuming our slices are nonoverlapping, we can write to the slice
353                let ptr = vec_base.add(i);
354                ptr.write(MaybeUninit::new(build(i)));
355            }
356        }
357        let old = shared_waker
358            .outstanding_tasks
359            .fetch_sub(1, std::sync::atomic::Ordering::Release);
360        if old == 1 {
361            shared_waker.waker.wake();
362        }
363        *poison = true;
364    }
365}
366
367impl<T, B> Future for SliceTask<T, B>
368where
369    B: FnMut(usize) -> T,
370{
371    type Output = ();
372
373    /// Polls the task to completion.
374    ///
375    /// This method computes all elements in the task's assigned range and
376    /// writes them directly to the shared vector. Once complete, it decrements
377    /// the outstanding task counter and wakes the result future if this was
378    /// the last task.
379    ///
380    /// # Panics
381    ///
382    /// Panics if polled after completion (poison flag is set).
383    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
384        assert!(!self.poison, "Polling after completion");
385        //pin-project
386        let (start, past_end, build, weak_own, shared_waker, poison) = unsafe {
387            let s = self.get_unchecked_mut();
388            let start = &mut s.start;
389            let past_end = &mut s.past_end;
390            let build = &mut s.build;
391            let weak_own = &mut s.own;
392            let shared_waker = &mut s.shared_waker;
393            (
394                start,
395                past_end,
396                build,
397                weak_own,
398                shared_waker,
399                &mut s.poison,
400            )
401        };
402        unsafe {
403            let safe_arc = weak_own.upgrade().expect("SliceTask was deallocated");
404            // assuming our slices are nonoverlapping, we can write to the slice
405            let ptr = safe_arc.as_ptr() as *mut MaybeUninit<T>;
406            Self::run_depinned(*start, *past_end, build, ptr, shared_waker, poison);
407        }
408        Poll::Ready(())
409    }
410}
411
412/// A future that resolves to the completed vector.
413///
414/// This future waits for all [`SliceTask`]s to complete and then assembles
415/// the final vector. It uses atomic operations to track task completion
416/// without requiring locks.
417///
418/// # Example
419///
420/// ```
421/// use vec_parallel::{build_vec, Strategy};
422///
423/// let builder = build_vec(5, Strategy::Max, |i| format!("Item {}", i));
424///
425/// // Complete all tasks
426/// for task in builder.tasks {
427///     test_executors::spin_on(task);
428/// }
429///
430/// // Get the final vector
431/// let result = test_executors::spin_on(builder.result);
432/// assert_eq!(result[0], "Item 0");
433/// assert_eq!(result[4], "Item 4");
434/// ```
435#[derive(Debug)]
436#[must_use = "futures do nothing unless you `.await` or poll them"]
437pub struct VecResult<I> {
438    vec: Option<Arc<Vec<MaybeUninit<I>>>>,
439    shared_waker: Arc<SharedWaker>,
440}
441impl<I> Future for VecResult<I> {
442    type Output = Vec<I>;
443
444    /// Polls the result future, returning the completed vector when all tasks finish.
445    ///
446    /// This method checks if all tasks have completed (outstanding_tasks == 0).
447    /// When they have, it converts the `Vec<MaybeUninit<I>>` to `Vec<I>` and
448    /// returns it. Otherwise, it registers the waker and returns `Pending`.
449    ///
450    /// # Safety
451    ///
452    /// The conversion from `MaybeUninit<I>` to `I` is safe because we only
453    /// perform it after all tasks have completed, guaranteeing all elements
454    /// are initialized.
455    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
456        self.shared_waker.waker.register(cx.waker());
457        if self
458            .shared_waker
459            .outstanding_tasks
460            .load(std::sync::atomic::Ordering::Acquire)
461            == 0
462        {
463            let v = self.vec.take().expect("Polling after completion");
464            let mut v = Arc::into_inner(v).expect("Too many references");
465            let ptr = v.as_mut_ptr() as *mut I;
466            let len = v.len();
467            let cap = v.capacity();
468            // SAFETY:
469            // 1. We got ptr/len/cap from a valid Vec
470            // 2. MaybeUninit<T> and T have the same size and alignment
471            // 3. We check for double-free
472            let f = unsafe {
473                let f = Vec::from_raw_parts(ptr, len, cap);
474                std::mem::forget(v);
475                f
476            };
477            Poll::Ready(f)
478        } else {
479            Poll::Pending
480        }
481    }
482}
483
484/// Contains the tasks and result future for building a vector in parallel.
485///
486/// A `VecBuilder` is created by [`build_vec`] and contains:
487/// - `tasks`: Individual futures that build portions of the vector
488/// - `result`: A future that resolves to the completed vector
489///
490/// # Usage Pattern
491///
492/// 1. Create a builder with [`build_vec`]
493/// 2. Spawn or poll the tasks (potentially on different threads)
494/// 3. Await the result to get the final vector
495///
496/// # Example
497///
498/// ```
499/// use vec_parallel::{build_vec, Strategy};
500///
501/// async fn build_squares() -> Vec<u32> {
502///     let builder = build_vec(10, Strategy::TasksPerCore(2), |i| {
503///         // Simulate expensive computation
504///         (i as u32) * (i as u32)
505///     });
506///     
507///     // In a real application, spawn tasks on your executor
508///     for task in builder.tasks {
509///         // tokio::spawn(task);
510///         # test_executors::spin_on(task);
511///     }
512///     
513///     // Wait for completion
514///     # test_executors::spin_on(builder.result)
515///     // builder.result.await
516/// }
517/// # test_executors::spin_on(build_squares());
518/// ```
519#[derive(Debug)]
520pub struct VecBuilder<I, B> {
521    /// The individual tasks that build portions of the vector.
522    ///
523    /// These can be spawned on an executor or polled manually.
524    pub tasks: Vec<SliceTask<I, B>>,
525    /// A future that resolves to the completed vector.
526    ///
527    /// This should be awaited after all tasks have been spawned.
528    pub result: VecResult<I>,
529}
530
531/// Re-export of `some_executor::hint::Hint` for convenient access.
532///
533/// Available when the `some_executor` feature is enabled.
534#[cfg(feature = "some_executor")]
535pub type Hint = some_executor::hint::Hint;
536
537/// Re-export of `some_executor::Priority` for convenient access.
538///
539/// Available when the `some_executor` feature is enabled.
540#[cfg(feature = "some_executor")]
541pub type Priority = some_executor::Priority;
542
543#[cfg(feature = "some_executor")]
544impl<I, B> VecBuilder<I, B> {
545    /// Spawns all tasks on the provided executor and awaits the result.
546    ///
547    /// This is a convenience method that handles task spawning and result
548    /// awaiting in one call. The tasks are spawned with the specified
549    /// priority and hint.
550    ///
551    /// # Arguments
552    ///
553    /// * `executor` - The executor to spawn tasks on
554    /// * `priority` - Task priority for scheduling
555    /// * `hint` - Execution hint for the executor
556    ///
557    /// # Example
558    ///
559    /// ```
560    /// # #[cfg(feature = "some_executor")]
561    /// # {
562    /// use vec_parallel::{build_vec, Strategy};
563    ///
564    /// # // Mock executor setup for testing
565    /// # use std::future::Future;
566    /// # use std::pin::Pin;
567    /// # use std::task::{Context, Poll, Waker};
568    /// # use std::sync::{Arc, Mutex};
569    /// # struct MockExecutor {
570    /// #     tasks: Arc<Mutex<Vec<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
571    /// # }
572    /// # impl MockExecutor {
573    /// #     fn new() -> Self {
574    /// #         Self { tasks: Arc::new(Mutex::new(Vec::new())) }
575    /// #     }
576    /// #     fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
577    /// #         self.tasks.lock().unwrap().push(Box::pin(future));
578    /// #     }
579    /// # }
580    ///
581    /// # async fn example() {
582    /// // Example with a real executor that implements SomeExecutor
583    /// // In practice, you would use your async runtime's spawning mechanism
584    ///
585    /// let builder = build_vec(100, Strategy::TasksPerCore(4), |i| i * i);
586    ///
587    /// // Spawn tasks on the current executor
588    /// // This example shows how you would integrate with tokio:
589    /// /*
590    /// for task in builder.tasks {
591    ///     tokio::spawn(task);
592    /// }
593    /// let squares = builder.result.await;
594    /// */
595    ///
596    /// // Or with the spawn_on method when using some_executor feature:
597    /// // let squares = builder.spawn_on(
598    /// //     &mut executor,
599    /// //     some_executor::Priority::default(),
600    /// //     some_executor::hint::Hint::default()
601    /// // ).await;
602    ///
603    /// # // For testing, run tasks synchronously
604    /// # let mut builder = build_vec(100, Strategy::TasksPerCore(4), |i| i * i);
605    /// # for task in builder.tasks {
606    /// #     test_executors::spin_on(task);
607    /// # }
608    /// # let squares = test_executors::spin_on(builder.result);
609    /// # assert_eq!(squares[10], 100);
610    /// # }
611    /// # test_executors::spin_on(example());
612    /// # }
613    /// ```
614    pub async fn spawn_on<E: some_executor::SomeExecutor>(
615        mut self,
616        executor: &mut E,
617        priority: some_executor::Priority,
618        hint: some_executor::hint::Hint,
619    ) -> Vec<I>
620    where
621        I: Send,
622        B: FnMut(usize) -> I,
623        B: Send,
624        I: 'static,
625        B: 'static,
626    {
627        use some_executor::task::{ConfigurationBuilder, Task};
628
629        let configuration = ConfigurationBuilder::new()
630            .priority(priority)
631            .hint(hint)
632            .build();
633
634        let mut observers = Vec::with_capacity(self.tasks.len());
635        for (t, task) in self.tasks.drain(..).enumerate() {
636            let label = format!("VecBuilder task {}", t);
637            let t = Task::without_notifications(label, configuration.clone(), task);
638            let o = executor.spawn(t);
639            observers.push(o);
640        }
641        self.result.await
642    }
643}
644
645/// Creates a builder for constructing a vector in parallel.
646///
647/// This function divides the work of building a vector into multiple async tasks
648/// based on the specified strategy. Each task computes elements for a range of
649/// indices using the provided closure.
650///
651/// # Arguments
652///
653/// * `len` - The length of the vector to create
654/// * `strategy` - How to divide the work among tasks (see [`Strategy`])
655/// * `f` - A closure that computes the element at a given index
656///
657/// # Type Parameters
658///
659/// * `R` - The element type of the resulting vector
660/// * `B` - The closure type (must be `FnMut(usize) -> R` and `Clone`)
661///
662/// # Implementation Details
663///
664/// The function pre-allocates a `Vec<MaybeUninit<R>>` of the requested size,
665/// wraps it in an `Arc` for sharing among tasks, and creates `SliceTask`s
666/// that each handle a non-overlapping range of indices. Task completion is
667/// tracked via atomic operations.
668///
669/// # Examples
670///
671/// ## Basic usage
672///
673/// ```
674/// use vec_parallel::{build_vec, Strategy};
675///
676/// // Build a vector of squares
677/// let builder = build_vec(10, Strategy::Tasks(2), |i| i * i);
678///
679/// // Execute tasks
680/// for task in builder.tasks {
681///     test_executors::spin_on(task);
682/// }
683///
684/// // Get result
685/// let squares = test_executors::spin_on(builder.result);
686/// assert_eq!(squares, vec![0, 1, 4, 9, 16, 25, 36, 49, 64, 81]);
687/// ```
688///
689/// ## Complex computation with shared state
690///
691/// ```
692/// use vec_parallel::{build_vec, Strategy};
693/// use std::sync::Arc;
694///
695/// // Shared configuration for all tasks
696/// let config = Arc::new(vec![1.0, 2.0, 3.0, 4.0]);
697/// let config_clone = config.clone();
698///
699/// let builder = build_vec(8, Strategy::Tasks(2), move |i| {
700///     // Each task gets its own clone of the Arc
701///     let sum: f64 = config_clone.iter().map(|&x| x * (i as f64)).sum();
702///     sum
703/// });
704///
705/// for task in builder.tasks {
706///     test_executors::spin_on(task);
707/// }
708///
709/// let result = test_executors::spin_on(builder.result);
710/// assert_eq!(result[0], 0.0);  // 0 * (1+2+3+4)
711/// assert_eq!(result[1], 10.0); // 1 * (1+2+3+4)
712/// ```
713///
714/// ## With expensive computation
715///
716/// ```
717/// use vec_parallel::{build_vec, Strategy};
718///
719/// fn expensive_computation(n: usize) -> u64 {
720///     // Simulate expensive work
721///     (0..1000).map(|i| (n + i) as u64).sum()
722/// }
723///
724/// let builder = build_vec(100, Strategy::TasksPerCore(4), expensive_computation);
725///
726/// // Track which tasks complete first
727/// let mut completed = 0;
728/// for mut task in builder.tasks {
729///     task.run();
730///     completed += 1;
731/// }
732/// assert_eq!(completed, 4 * num_cpus::get());
733///
734/// let result = test_executors::spin_on(builder.result);
735/// assert_eq!(result.len(), 100);
736/// assert_eq!(result[0], expensive_computation(0));
737/// ```
738///
739/// ## Stateful closures
740///
741/// ```
742/// use vec_parallel::{build_vec, Strategy};
743///
744/// let offset = 100;
745/// let builder = build_vec(5, Strategy::Max, move |i| i + offset);
746///
747/// # for task in builder.tasks { test_executors::spin_on(task); }
748/// let result = test_executors::spin_on(builder.result);
749/// assert_eq!(result, vec![100, 101, 102, 103, 104]);
750/// ```
751/// # Returns
752///
753/// A [`VecBuilder`] containing the tasks to execute and a result future.
754///
755/// # Panics
756///
757/// This function does not panic. If more tasks are requested than elements,
758/// the task count is automatically reduced.
759pub fn build_vec<R, B>(len: usize, strategy: Strategy, f: B) -> VecBuilder<R, B>
760where
761    B: FnMut(usize) -> R,
762    B: Clone,
763{
764    let mut vec = Vec::with_capacity(len);
765    vec.resize_with(len, MaybeUninit::uninit);
766
767    let vec_arc = Arc::new(vec);
768    match strategy {
769        Strategy::One => build_vec(len, Strategy::Tasks(1), f),
770        Strategy::Tasks(tasks) => {
771            if tasks > len {
772                return build_vec(len, Strategy::Tasks(len), f);
773            }
774            let shared_waker = Arc::new(SharedWaker {
775                outstanding_tasks: AtomicUsize::new(tasks),
776                waker: AtomicWaker::new(),
777            });
778            let mut task_vec = Vec::with_capacity(tasks);
779            let chunk = len / tasks;
780            for i in 0..tasks {
781                let start = i * chunk;
782                let end = if i + 1 == tasks { len } else { start + chunk };
783                let task = SliceTask {
784                    build: f.clone(),
785                    own: Arc::downgrade(&vec_arc),
786                    start,
787                    past_end: end,
788                    shared_waker: shared_waker.clone(),
789                    poison: false,
790                };
791                task_vec.push(task);
792            }
793            let result = VecResult {
794                vec: Some(vec_arc),
795                shared_waker,
796            };
797            VecBuilder {
798                tasks: task_vec,
799                result,
800            }
801        }
802        Strategy::Max => build_vec(len, Strategy::Tasks(len), f),
803        Strategy::TasksPerCore(tasks_per_core) => {
804            let tasks = tasks_per_core * num_cpus::get();
805            build_vec(len, Strategy::Tasks(tasks), f)
806        }
807    }
808}
809
810// Implementation notes for trait implementations:
811//
812// VecBuilder trait implementations
813// Clone: Not implemented. VecBuilder contains mutable tasks with shared synchronization state.
814// Cloning would create confusing semantics around task execution and completion tracking.
815// May be reconsidered in the future if a clear use case emerges.
816//
817// PartialEq/Eq: Not implemented. Each VecBuilder represents a unique parallel computation
818// with distinct synchronization state. Equality comparisons don't have meaningful semantics.
819//
820// Send/Sync: Automatically derived based on generic parameters.
821// VecBuilder is Send when I: Send and B: Send.
822// VecBuilder is Sync when I: Send + Sync and B: Send + Sync.
823// This follows from the Send/Sync properties of its fields (tasks and result).
824//
825// Hash: Not implemented since PartialEq/Eq are not implemented.
826// Default: Not implemented. VecBuilder requires specific parameters (length, strategy, closure).
827// Display: Not implemented. Not typically useful for builder types.
828// From/Into: Not implemented. No obvious conversions to/from other types.
829// AsRef/AsMut: Not implemented. Fields are already public, providing direct access.
830// Deref/DerefMut: Not implemented. VecBuilder is not a wrapper around a single underlying type.
831
832/// Equality comparison for `SliceTask`.
833///
834/// Two tasks are considered equal if they operate on the same range
835/// of the same underlying vector.
836impl<T, B> PartialEq for SliceTask<T, B> {
837    fn eq(&self, other: &Self) -> bool {
838        self.start == other.start
839            && self.past_end == other.past_end
840            && Weak::ptr_eq(&self.own, &other.own)
841    }
842}
843
844impl<T, B> Eq for SliceTask<T, B> {}
845
846/// Hash implementation for `SliceTask`.
847///
848/// Hashes based on the task's range and the underlying vector pointer.
849impl<T, B> std::hash::Hash for SliceTask<T, B> {
850    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
851        self.start.hash(state);
852        self.past_end.hash(state);
853        Weak::as_ptr(&self.own).hash(state);
854    }
855}
856//asref/asmut - sort of hard to implement safely to avoid double-muts.
857
858// VecResult boilerplate
859// Clone: Not implemented. VecResult is a consuming future that takes ownership of the
860// underlying Vec when polled to completion. Cloning would create confusing semantics
861// where multiple futures try to take ownership of the same data.
862//
863// PartialEq/Eq: Not implemented. VecResult is a stateful future with internal synchronization
864// state that gets consumed during polling. Equality comparisons don't have meaningful semantics.
865//
866// Hash: Not implemented since PartialEq/Eq are not implemented.
867//
868// Copy: Not implemented. Contains heap-allocated data via Arc.
869//
870// Default: Not implemented. VecResult requires specific initialization with a vec and shared_waker
871// that coordinate with associated SliceTask instances.
872//
873// Display: Not implemented. Not typically useful for future types.
874//
875// From/Into: Not implemented. No obvious conversions to/from other types.
876//
877// AsRef/AsMut: Not implemented. Internal fields are private implementation details
878// of the future's synchronization mechanism.
879//
880// Deref/DerefMut: Not implemented. VecResult is not a wrapper around a single underlying type.
881//
882// Send/Sync: Automatically derived based on generic parameter I.
883// VecResult is Send when I: Send, and Sync when I: Send + Sync.
884// This follows from the Send/Sync properties of Arc<Vec<MaybeUninit<I>>> and Arc<SharedWaker>.
885
886#[cfg(test)]
887mod tests {
888    use crate::VecResult;
889
890    #[cfg(target_arch = "wasm32")]
891    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
892
893    #[cfg_attr(not(target_arch = "wasm32"), test)]
894    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
895    fn test_build_vec() {
896        let builder = super::build_vec(10, super::Strategy::One, |i: usize| i);
897        for task in builder.tasks {
898            test_executors::spin_on(task);
899        }
900        let o = test_executors::spin_on::<VecResult<_>>(builder.result);
901        assert_eq!(o, (0..10).collect::<Vec<_>>());
902    }
903
904    #[cfg_attr(not(target_arch = "wasm32"), test)]
905    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
906    fn test_build_vec_tasks() {
907        let builder = super::build_vec(10, super::Strategy::Tasks(2), |i: usize| i);
908        assert_eq!(builder.tasks.len(), 2);
909        assert_eq!(builder.tasks[0].start, 0);
910        assert_eq!(builder.tasks[0].past_end, 5);
911        assert_eq!(builder.tasks[1].start, 5);
912        assert_eq!(builder.tasks[1].past_end, 10);
913    }
914
915    #[cfg_attr(not(target_arch = "wasm32"), test)]
916    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
917    fn test_build_vec_tasks_11() {
918        let builder = super::build_vec(13, super::Strategy::Tasks(3), |i: usize| i);
919        assert_eq!(builder.tasks.len(), 3);
920        assert_eq!(builder.tasks[0].start, 0);
921        assert_eq!(builder.tasks[0].past_end, 4);
922        assert_eq!(builder.tasks[1].start, 4);
923        assert_eq!(builder.tasks[1].past_end, 8);
924        assert_eq!(builder.tasks[2].start, 8);
925        assert_eq!(builder.tasks[2].past_end, 13);
926    }
927
928    #[cfg_attr(not(target_arch = "wasm32"), test)]
929    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
930    fn test_build_vec_tasks_103() {
931        let builder = super::build_vec(103, super::Strategy::Tasks(3), |i: usize| i);
932        assert_eq!(builder.tasks.len(), 3);
933        assert_eq!(builder.tasks[0].start, 0);
934        assert_eq!(builder.tasks[0].past_end, 34);
935        assert_eq!(builder.tasks[1].start, 34);
936        assert_eq!(builder.tasks[1].past_end, 68);
937        assert_eq!(builder.tasks[2].start, 68);
938        assert_eq!(builder.tasks[2].past_end, 103);
939    }
940
941    #[cfg_attr(not(target_arch = "wasm32"), test)]
942    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
943    fn test_max() {
944        let builder = super::build_vec(10, super::Strategy::Max, |i: usize| i);
945        assert_eq!(builder.tasks.len(), 10);
946        let mut start = 0;
947        for task in builder.tasks {
948            assert_eq!(task.start, start);
949            assert_eq!(task.past_end, start + 1);
950            start += 1;
951        }
952    }
953
954    #[cfg_attr(not(target_arch = "wasm32"), test)]
955    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
956    fn test_too_many_tasks() {
957        let builder = super::build_vec(10, super::Strategy::Tasks(20), |i: usize| i);
958        assert_eq!(builder.tasks.len(), 10);
959        let mut start = 0;
960        for task in builder.tasks {
961            assert_eq!(task.start, start);
962            assert_eq!(task.past_end, start + 1);
963            start += 1;
964        }
965    }
966
967    #[cfg_attr(not(target_arch = "wasm32"), test)]
968    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
969    fn test_tasks_per_core() {
970        let builder = super::build_vec(1000, super::Strategy::TasksPerCore(2), |i: usize| i);
971        let cpus = num_cpus::get();
972        assert_eq!(builder.tasks.len(), 2 * cpus);
973        let mut start = 0;
974        for task in builder.tasks {
975            assert_eq!(task.start, start);
976            assert!(task.past_end > start);
977            start = task.past_end;
978        }
979    }
980
981    #[test]
982    fn test_send() {
983        fn is_send<T: Send>(_t: &T) {}
984        fn is_static<T: 'static>(_t: &T) {}
985
986        let mut builder = super::build_vec(1000, super::Strategy::TasksPerCore(2), |i: usize| i);
987        let a_task = builder.tasks.remove(0);
988
989        is_send(&a_task);
990        is_static(&a_task);
991
992        let a_result = builder.result;
993        is_send(&a_result);
994        is_static(&a_result);
995    }
996
997    #[cfg(feature = "some_executor")]
998    #[test]
999    fn test_spawn_on() {
1000        let executor = test_executors::aruntime::SpawnRuntime;
1001        some_executor::thread_executor::set_thread_executor(Box::new(executor));
1002        let builder = super::build_vec(10, super::Strategy::Max, |i: usize| i);
1003
1004        some_executor::thread_executor::thread_executor(|e| {
1005            _ = builder.spawn_on(
1006                &mut e.unwrap().clone_box(),
1007                some_executor::Priority::unit_test(),
1008                some_executor::hint::Hint::default(),
1009            );
1010        });
1011    }
1012}