concurrent_slice/
chunk.rs

1use crate::{chunks::Chunks, common::*, guard::Guard};
2
3/// A mutable sub-slice reference-counted reference to a slice-like data.
4#[derive(Debug)]
5pub struct Chunk<S, T> {
6    pub(super) data: Arc<S>,
7    pub(super) slice: NonNull<[T]>,
8}
9
10impl<S, T> Chunk<S, T> {
11    /// Splits the chunk into two sub-chunks, divided at specified index.
12    ///
13    /// # Panics
14    /// The method panics if the index is out of bound.
15    pub fn split_at(mut self, index: usize) -> (Chunk<S, T>, Chunk<S, T>)
16    where
17        S: AsMut<[T]>,
18        T: 'static + Send,
19    {
20        unsafe {
21            let data = self.data;
22            let slice: &mut [T] = self.slice.as_mut();
23            let lslice = NonNull::new_unchecked(&mut slice[0..index] as *mut [T]);
24            let rslice = NonNull::new_unchecked(&mut slice[index..] as *mut [T]);
25
26            (
27                Chunk {
28                    data: data.clone(),
29                    slice: lslice,
30                },
31                Chunk {
32                    data,
33                    slice: rslice,
34                },
35            )
36        }
37    }
38
39    /// Returns an iterator of roughly  fixed-sized chunks of the chunk.
40    ///
41    /// Each chunk has `chunk_size` elements, expect the last chunk maybe shorter
42    /// if there aren't enough elements.
43    ///
44    /// The yielded chunks maintain a global reference count on owning data. Each chunk refers to
45    /// a mutable and exclusive sub-slice, enabling concurrent processing on input data.
46    ///
47    /// # Panics
48    /// The method panics if `chunk_size` is zero and slice length is not zero.
49    pub fn chunks(self, chunk_size: usize) -> Chunks<S, T>
50    where
51        S: 'static + AsMut<[T]> + Sized + Send,
52        T: 'static + Send,
53    {
54        unsafe {
55            let data = self.data;
56            let data_ptr = Arc::as_ptr(&data) as *mut S;
57            let data_slice = data_ptr.as_mut().unwrap().as_mut();
58
59            let slice_len = self.slice.as_ref().len();
60            let slice_ptr = self.slice.as_ref().as_ptr();
61            let start = slice_ptr.offset_from(data_slice.as_ptr()) as usize;
62
63            assert!(
64                slice_len == 0 || chunk_size > 0,
65                "chunk_size must be positive for non-empty slice"
66            );
67
68            Chunks {
69                chunk_size,
70                index: start,
71                end: start + slice_len,
72                data,
73                _phantom: PhantomData,
74            }
75        }
76    }
77
78    /// Returns an iterator with roughly `division` length of roughly fixed-sized chunks of the chunk.
79    ///
80    /// The chunk size is determined by `division`. The last chunk maybe shorter if
81    /// there aren't enough elements. If `division` is `None`, it defaults to
82    /// the number of system processors.
83    ///
84    /// # Panics
85    /// The method panics if `division` is zero and slice length is not zero.
86    pub fn chunks_by_division(self, division: impl Into<Option<usize>>) -> Chunks<S, T>
87    where
88        S: 'static + AsMut<[T]> + Sized + Send,
89        T: 'static + Send,
90    {
91        unsafe {
92            let division = division.into().unwrap_or_else(num_cpus::get);
93
94            let data = self.data;
95            let data_ptr = Arc::as_ptr(&data) as *mut S;
96            let data_slice = data_ptr.as_mut().unwrap().as_mut();
97
98            let slice_len = self.slice.as_ref().len();
99            let slice_ptr = self.slice.as_ref().as_ptr();
100            let start = slice_ptr.offset_from(data_slice.as_ptr()) as usize;
101
102            let chunk_size = if slice_len == 0 {
103                0
104            } else {
105                assert!(division > 0, "division must be positive, but get zero");
106                (slice_len + division - 1) / division
107            };
108
109            Chunks {
110                index: start,
111                chunk_size,
112                end: start + slice_len,
113                data,
114                _phantom: PhantomData,
115            }
116        }
117    }
118
119    /// Returns the guard that is used to recover the owning data.
120    pub fn guard(&self) -> Guard<S> {
121        Guard {
122            data: self.data.clone(),
123        }
124    }
125
126    /// Gets the reference count on the owning data.
127    pub fn ref_count(&self) -> usize {
128        Arc::strong_count(&self.data)
129    }
130
131    /// Concatenates contiguous chunks into one chunk.
132    ///
133    /// # Panics
134    /// The method panics if the chunks are not contiguous, or
135    /// the chunks refer to inconsistent data.
136    pub fn cat(chunks: impl IntoIterator<Item = Self>) -> Self
137    where
138        S: AsMut<[T]>,
139    {
140        unsafe {
141            let mut chunks = chunks.into_iter();
142
143            // obtain inner pointer from the first chunk
144            let first = chunks.next().expect("the chunks must be non-empty");
145            let data = first.data.clone();
146
147            let mut chunks: Vec<_> = iter::once(first)
148                .chain(chunks.inspect(|chunk| {
149                    // verify if all chunks points to the same owner
150                    assert_eq!(
151                        Arc::as_ptr(&chunk.data),
152                        Arc::as_ptr(&data),
153                        "inconsistent owner of the chunks"
154                    );
155                }))
156                .collect();
157
158            // verify if chunks are contiguous
159            chunks
160                .iter()
161                .zip(chunks.iter().skip(1))
162                .for_each(|(prev, next)| {
163                    let prev_end = prev.slice.as_ref().as_ptr_range().end;
164                    let next_start = next.slice.as_ref().as_ptr_range().start;
165                    assert!(prev_end == next_start, "the chunks are not contiguous");
166                });
167
168            // save slice range
169            let len = chunks.iter().map(|chunk| chunk.slice.as_ref().len()).sum();
170            let slice_ptr: *mut T = chunks.first_mut().unwrap().as_mut().as_mut_ptr();
171
172            // free chunk references
173            drop(chunks);
174
175            // create returning chunk
176            let slice = {
177                let slice = slice::from_raw_parts_mut(slice_ptr, len);
178                NonNull::new_unchecked(slice as *mut [T])
179            };
180
181            Chunk { data, slice }
182        }
183    }
184
185    pub fn into_arc_ref(self) -> ArcRef<S, [T]> {
186        unsafe {
187            let Self { data, slice } = self;
188            ArcRef::new(data).map(|_| slice.as_ref())
189        }
190    }
191}
192
193unsafe impl<S, T> Send for Chunk<S, T> {}
194unsafe impl<S, T> Sync for Chunk<S, T> {}
195
196impl<S, T> AsRef<[T]> for Chunk<S, T> {
197    fn as_ref(&self) -> &[T] {
198        self.deref()
199    }
200}
201
202impl<S, T> AsMut<[T]> for Chunk<S, T> {
203    fn as_mut(&mut self) -> &mut [T] {
204        self.deref_mut()
205    }
206}
207
208impl<S, T> Deref for Chunk<S, T> {
209    type Target = [T];
210
211    fn deref(&self) -> &Self::Target {
212        unsafe { self.slice.as_ref() }
213    }
214}
215
216impl<S, T> DerefMut for Chunk<S, T> {
217    fn deref_mut(&mut self) -> &mut Self::Target {
218        unsafe { self.slice.as_mut() }
219    }
220}
221
222impl<'a, S, T> IntoIterator for &'a Chunk<S, T> {
223    type Item = &'a T;
224    type IntoIter = slice::Iter<'a, T>;
225
226    fn into_iter(self) -> Self::IntoIter {
227        self.deref().iter()
228    }
229}
230
231impl<'a, S, T> IntoIterator for &'a mut Chunk<S, T> {
232    type Item = &'a mut T;
233    type IntoIter = slice::IterMut<'a, T>;
234
235    fn into_iter(self) -> Self::IntoIter {
236        self.deref_mut().iter_mut()
237    }
238}