1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
use super::TaskPool;

/// Helper functions to iterate over a slice in parallel.
pub trait ParallelSlice<T: Sync>: AsRef<[T]> {
    /// Iterates in parallel over a slice with a given chunk size.
    ///
    /// # Example
    ///
    /// ```
    /// use entangled::*;
    ///
    /// let v = vec![42; 1000];
    /// let task_pool = TaskPool::default();
    /// let outputs = v.par_chunk_map(&task_pool, 512, |numbers| -> i32 { numbers.iter().sum() });
    /// let mut sum = 0;
    ///
    /// for output in outputs {
    ///     sum += output;
    /// }
    ///
    /// assert_eq!(sum, 1000 * 42);
    /// ```
    fn par_chunk_map<F, R>(&self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec<R>
    where
        F: Fn(&[T]) -> R + Send + Sync,
        R: Send + 'static,
    {
        let slice = self.as_ref();
        let f = &f;
        task_pool.scope(|scope| {
            for chunk in slice.chunks(chunk_size) {
                scope.spawn(async move { f(chunk) });
            }
        })
    }

    /// Iterates in parallel over a slice with a given maximum of tasks. `max_tasks` is None, it
    /// will use the thread number of the `TaskPool`.
    ///
    /// # Example
    ///
    /// ```
    /// use entangled::*;
    ///
    /// let v = vec![42; 1000];
    /// let task_pool = TaskPool::default();
    /// let outputs = v.par_splat_map(&task_pool, Some(2), |numbers| -> i32 { numbers.iter().sum() });
    /// let mut sum = 0;
    ///
    /// for output in outputs {
    ///     sum += output;
    /// }
    ///
    /// assert_eq!(sum, 1000 * 42);
    /// ```
    fn par_splat_map<F, R>(&self, task_pool: &TaskPool, max_tasks: Option<usize>, f: F) -> Vec<R>
    where
        F: Fn(&[T]) -> R + Send + Sync,
        R: Send + 'static,
    {
        let slice = self.as_ref();
        let chunk_size = std::cmp::max(
            1,
            std::cmp::max(
                slice.len() / task_pool.thread_num(),
                slice.len() / max_tasks.unwrap_or(usize::MAX),
            ),
        );

        slice.par_chunk_map(task_pool, chunk_size, f)
    }
}

impl<S, T: Sync> ParallelSlice<T> for S where S: AsRef<[T]> {}

/// Helper functions to iterate over a mutable slice in parallel.
pub trait ParallelSliceMut<T: Send>: AsMut<[T]> {
    /// Iterates in parallel over a mutable slice with a given chunk size.
    ///
    /// # Example
    ///
    /// ```
    /// use entangled::*;
    ///
    /// let mut v = vec![42; 1000];
    /// let task_pool = TaskPool::default();
    ///
    /// let outputs = v.par_chunk_map_mut(&task_pool, 512, |numbers| -> i32 {
    /// for number in numbers.iter_mut() {
    ///     *number *= 2;
    /// }
    /// numbers.iter().sum()
    /// });
    ///
    /// let mut sum = 0;
    /// for output in outputs {
    ///     sum += output;
    /// }
    ///
    /// assert_eq!(sum, 1000 * 42 * 2);
    /// assert_eq!(v[0], 84);
    /// ```
    fn par_chunk_map_mut<F, R>(&mut self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec<R>
    where
        F: Fn(&mut [T]) -> R + Send + Sync,
        R: Send + 'static,
    {
        let slice = self.as_mut();
        let f = &f;
        task_pool.scope(|scope| {
            for chunk in slice.chunks_mut(chunk_size) {
                scope.spawn(async move { f(chunk) });
            }
        })
    }

    /// Iterates in parallel over a mutable slice with a given maximum of tasks. `max_tasks` is None, it
    /// will use the thread number of the `TaskPool`.
    ///
    /// # Example
    ///
    /// ```
    /// use entangled::*;
    ///
    /// let mut v = vec![42; 1000];
    /// let task_pool = TaskPool::default();
    ///
    /// let outputs = v.par_splat_map_mut(&task_pool, None, |numbers| -> i32 {
    /// for number in numbers.iter_mut() {
    ///     *number *= 2;
    /// }
    /// numbers.iter().sum()
    /// });
    ///
    /// let mut sum = 0;
    /// for output in outputs {
    ///     sum += output;
    /// }
    ///
    /// assert_eq!(sum, 1000 * 42 * 2);
    /// assert_eq!(v[0], 84);
    /// ```
    fn par_splat_map_mut<F, R>(
        &mut self,
        task_pool: &TaskPool,
        max_tasks: Option<usize>,
        f: F,
    ) -> Vec<R>
    where
        F: Fn(&mut [T]) -> R + Send + Sync,
        R: Send + 'static,
    {
        let mut slice = self.as_mut();
        let chunk_size = std::cmp::max(
            1,
            std::cmp::max(
                slice.len() / task_pool.thread_num(),
                slice.len() / max_tasks.unwrap_or(usize::MAX),
            ),
        );

        slice.par_chunk_map_mut(task_pool, chunk_size, f)
    }
}

impl<S, T: Send> ParallelSliceMut<T> for S where S: AsMut<[T]> {}