lamellar/array/local_lock_atomic/
iteration.rs

1use parking_lot::Mutex;
2
3use crate::array::iterator::distributed_iterator::*;
4use crate::array::iterator::local_iterator::*;
5use crate::array::iterator::one_sided_iterator::OneSidedIter;
6use crate::array::iterator::{private::*, LamellarArrayIterators, LamellarArrayMutIterators};
7use crate::array::local_lock_atomic::*;
8use crate::array::private::LamellarArrayPrivate;
9use crate::array::r#unsafe::private::UnsafeArrayInner;
10use crate::array::*;
11use crate::darc::local_rw_darc::LocalRwDarcWriteGuard;
12use crate::memregion::Dist;
13
14use self::iterator::IterLockFuture;
15
16impl<T> InnerArray for LocalLockArray<T> {
17    fn as_inner(&self) -> &UnsafeArrayInner {
18        &self.array.inner
19    }
20}
21
22//#[doc(hidden)]
23#[derive(Clone)]
24pub struct LocalLockDistIter<'a, T: Dist> {
25    data: LocalLockArray<T>,
26    lock: Arc<Mutex<Option<LocalRwDarcReadGuard<()>>>>,
27    cur_i: usize,
28    end_i: usize,
29    _marker: PhantomData<&'a T>,
30}
31
32impl<'a, T: Dist> InnerIter for LocalLockDistIter<'a, T> {
33    fn lock_if_needed(&self, _s: Sealed) -> Option<IterLockFuture> {
34        // println!(
35        //     " LocalLockDistIter lock_if_needed: {:?}",
36        //     std::thread::current().id()
37        // );
38        if self.lock.lock().is_none() {
39            // println!("LocalLockDistIter need to get read handle");
40            let lock_handle = self.data.lock.read();
41            let lock = self.lock.clone();
42
43            Some(Box::pin(async move {
44                // println!("LocalLockDistIter trying to get read handle");
45                *lock.lock() = Some(lock_handle.await);
46                // println!("LocalLockDistIter got the read lock");
47            }))
48        } else {
49            None
50        }
51    }
52    fn iter_clone(&self, _s: Sealed) -> Self {
53        LocalLockDistIter {
54            data: self.data.clone(),
55            lock: self.lock.clone(),
56            cur_i: self.cur_i,
57            end_i: self.end_i,
58            _marker: PhantomData,
59        }
60    }
61}
62
63impl<'a, T: Dist> std::fmt::Debug for LocalLockDistIter<'a, T> {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        write!(
66            f,
67            "LocalLockDistIter{{ data.len: {:?}, cur_i: {:?}, end_i: {:?} }}",
68            self.data.len(),
69            self.cur_i,
70            self.end_i
71        )
72    }
73}
74
75//#[doc(hidden)]
76#[derive(Clone)]
77pub struct LocalLockLocalIter<'a, T: Dist> {
78    data: LocalLockArray<T>,
79    lock: Arc<Mutex<Option<LocalRwDarcReadGuard<()>>>>,
80    cur_i: usize,
81    end_i: usize,
82    _marker: PhantomData<&'a T>,
83}
84
85impl<'a, T: Dist> InnerIter for LocalLockLocalIter<'a, T> {
86    fn lock_if_needed(&self, _s: Sealed) -> Option<IterLockFuture> {
87        // println!(
88        //     " LocalLockLocalIter lock_if_needed: {:?}",
89        //     std::thread::current().id()
90        // );
91        if self.lock.lock().is_none() {
92            // println!("LocalLockLocalIter need to get read handle");
93            let lock_handle = self.data.lock.read();
94            let lock = self.lock.clone();
95
96            Some(Box::pin(async move {
97                // println!("LocalLockLocalIter trying to get read handle");
98                *lock.lock() = Some(lock_handle.await);
99                // println!("LocalLockLocalIter got the read lock");
100            }))
101        } else {
102            None
103        }
104    }
105    fn iter_clone(&self, _s: Sealed) -> Self {
106        LocalLockLocalIter {
107            data: self.data.clone(),
108            lock: self.lock.clone(),
109            cur_i: self.cur_i,
110            end_i: self.end_i,
111            _marker: PhantomData,
112        }
113    }
114}
115
116impl<'a, T: Dist> std::fmt::Debug for LocalLockLocalIter<'a, T> {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        write!(
119            f,
120            "LocalLockLocalIter{{ data.len: {:?}, cur_i: {:?}, end_i: {:?} }}",
121            self.data.len(),
122            self.cur_i,
123            self.end_i
124        )
125    }
126}
127
128impl<T: Dist + 'static> DistributedIterator for LocalLockDistIter<'static, T> {
129    type Item = &'static T;
130    type Array = LocalLockArray<T>;
131    fn init(&self, start_i: usize, cnt: usize, _s: Sealed) -> Self {
132        let max_i = self.data.num_elems_local();
133        // println!("init dist iter start_i: {:?} cnt {:?} end_i: {:?} max_i: {:?}",start_i,cnt, start_i+cnt,max_i);
134        LocalLockDistIter {
135            data: self.data.clone(),
136            lock: self.lock.clone(),
137            cur_i: std::cmp::min(start_i, max_i),
138            end_i: std::cmp::min(start_i + cnt, max_i),
139            _marker: PhantomData,
140        }
141    }
142    fn array(&self) -> Self::Array {
143        self.data.clone()
144    }
145    fn next(&mut self) -> Option<Self::Item> {
146        if self.cur_i < self.end_i {
147            self.cur_i += 1;
148            unsafe {
149                self.data
150                    .array
151                    .local_as_ptr()
152                    .offset((self.cur_i - 1) as isize)
153                    .as_ref()
154            }
155        } else {
156            None
157        }
158    }
159    fn elems(&self, in_elems: usize) -> usize {
160        in_elems
161    }
162    fn advance_index(&mut self, count: usize) {
163        self.cur_i = std::cmp::min(self.cur_i + count, self.end_i);
164    }
165}
166impl<T: Dist + 'static> IndexedDistributedIterator for LocalLockDistIter<'static, T> {
167    fn iterator_index(&self, index: usize) -> Option<usize> {
168        let g_index = self.data.subarray_index_from_local(index, 1);
169        g_index
170    }
171}
172
173impl<T: Dist + 'static> LocalIterator for LocalLockLocalIter<'static, T> {
174    type Item = &'static T;
175    type Array = LocalLockArray<T>;
176    fn init(&self, start_i: usize, cnt: usize, _s: Sealed) -> Self {
177        let max_i = self.data.num_elems_local();
178        // println!("init dist iter start_i: {:?} cnt {:?} end_i: {:?} max_i: {:?}",start_i,cnt, start_i+cnt,max_i);
179        LocalLockLocalIter {
180            data: self.data.clone(),
181            lock: self.lock.clone(),
182            cur_i: std::cmp::min(start_i, max_i),
183            end_i: std::cmp::min(start_i + cnt, max_i),
184            _marker: PhantomData,
185        }
186    }
187    fn array(&self) -> Self::Array {
188        self.data.clone()
189    }
190    fn next(&mut self) -> Option<Self::Item> {
191        if self.cur_i < self.end_i {
192            self.cur_i += 1;
193            unsafe {
194                self.data
195                    .array
196                    .local_as_ptr()
197                    .offset((self.cur_i - 1) as isize)
198                    .as_ref()
199            }
200        } else {
201            None
202        }
203    }
204    fn elems(&self, in_elems: usize) -> usize {
205        in_elems
206    }
207
208    fn advance_index(&mut self, count: usize) {
209        self.cur_i = std::cmp::min(self.cur_i + count, self.end_i);
210    }
211}
212
213impl<T: Dist + 'static> IndexedLocalIterator for LocalLockLocalIter<'static, T> {
214    fn iterator_index(&self, index: usize) -> Option<usize> {
215        if index < self.data.len() {
216            Some(index) //everyone at this point as calculated the actual index (cause we are local only) so just return it
217        } else {
218            None
219        }
220    }
221}
222
223pub struct LocalLockDistIterMut<'a, T: Dist> {
224    data: LocalLockArray<T>,
225    lock: Arc<Mutex<Option<LocalRwDarcWriteGuard<()>>>>,
226    cur_i: usize,
227    end_i: usize,
228    _marker: PhantomData<&'a T>,
229}
230
231impl<'a, T: Dist> InnerIter for LocalLockDistIterMut<'a, T> {
232    fn lock_if_needed(&self, _s: Sealed) -> Option<IterLockFuture> {
233        // println!(
234        //     " LocalLockDistIterMut lock_if_needed: {:?}",
235        //     std::thread::current().id()
236        // );
237        if self.lock.lock().is_none() {
238            // println!("LocalLockDistIterMut need to get write handle");
239            let lock_handle = self.data.lock.write();
240            let lock = self.lock.clone();
241
242            Some(Box::pin(async move {
243                // println!("LocalLockDistIterMut trying to get write handle");
244                *lock.lock() = Some(lock_handle.await);
245                // println!("LocalLockDistIterMut got the write lock");
246            }))
247        } else {
248            None
249        }
250    }
251    fn iter_clone(&self, _s: Sealed) -> Self {
252        LocalLockDistIterMut {
253            data: self.data.clone(),
254            lock: self.lock.clone(),
255            cur_i: self.cur_i,
256            end_i: self.end_i,
257            _marker: PhantomData,
258        }
259    }
260}
261
262impl<'a, T: Dist> std::fmt::Debug for LocalLockDistIterMut<'a, T> {
263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264        write!(
265            f,
266            "LocalLockDistIterMut{{ data.len: {:?}, cur_i: {:?}, end_i: {:?} }}",
267            self.data.len(),
268            self.cur_i,
269            self.end_i
270        )
271    }
272}
273
274pub struct LocalLockLocalIterMut<'a, T: Dist> {
275    data: LocalLockArray<T>,
276    lock: Arc<Mutex<Option<LocalRwDarcWriteGuard<()>>>>,
277    cur_i: usize,
278    end_i: usize,
279    _marker: PhantomData<&'a T>,
280}
281
282impl<'a, T: Dist> InnerIter for LocalLockLocalIterMut<'a, T> {
283    fn lock_if_needed(&self, _s: Sealed) -> Option<IterLockFuture> {
284        // println!(
285        //     " LocalLockLocalIterMut lock_if_needed: {:?}",
286        //     std::thread::current().id()
287        // );
288        if self.lock.lock().is_none() {
289            // println!("LocalLockLocalIterMut need to get write handle");
290            let lock_handle = self.data.lock.write();
291            let lock = self.lock.clone();
292
293            Some(Box::pin(async move {
294                // println!("LocalLockLocalIterMut trying to get write handle");
295                *lock.lock() = Some(lock_handle.await);
296                // println!("LocalLockLocalIterMut got the write lock");
297            }))
298        } else {
299            None
300        }
301    }
302    fn iter_clone(&self, _s: Sealed) -> Self {
303        LocalLockLocalIterMut {
304            data: self.data.clone(),
305            lock: self.lock.clone(),
306            cur_i: self.cur_i,
307            end_i: self.end_i,
308            _marker: PhantomData,
309        }
310    }
311}
312
313impl<'a, T: Dist> std::fmt::Debug for LocalLockLocalIterMut<'a, T> {
314    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
315        write!(
316            f,
317            "LocalLockLocalIterMut{{ data.len: {:?}, cur_i: {:?}, end_i: {:?} }}",
318            self.data.len(),
319            self.cur_i,
320            self.end_i
321        )
322    }
323}
324
325impl<T: Dist + 'static> DistributedIterator for LocalLockDistIterMut<'static, T> {
326    type Item = &'static mut T;
327    type Array = LocalLockArray<T>;
328    fn init(&self, start_i: usize, cnt: usize, _s: Sealed) -> Self {
329        let max_i = self.data.num_elems_local();
330        // println!("init dist iter start_i: {:?} cnt {:?} end_i: {:?} max_i: {:?}",start_i,cnt, start_i+cnt,max_i);
331        LocalLockDistIterMut {
332            data: self.data.clone(),
333            lock: self.lock.clone(),
334            cur_i: std::cmp::min(start_i, max_i),
335            end_i: std::cmp::min(start_i + cnt, max_i),
336            _marker: PhantomData,
337        }
338    }
339    fn array(&self) -> Self::Array {
340        self.data.clone()
341    }
342    fn next(&mut self) -> Option<Self::Item> {
343        if self.cur_i < self.end_i {
344            self.cur_i += 1;
345            unsafe {
346                Some(
347                    &mut *self
348                        .data
349                        .array
350                        .local_as_mut_ptr()
351                        .offset((self.cur_i - 1) as isize),
352                )
353            }
354        } else {
355            None
356        }
357    }
358    fn elems(&self, in_elems: usize) -> usize {
359        in_elems
360    }
361
362    fn advance_index(&mut self, count: usize) {
363        self.cur_i = std::cmp::min(self.cur_i + count, self.end_i);
364    }
365}
366
367impl<T: Dist + 'static> IndexedDistributedIterator for LocalLockDistIterMut<'static, T> {
368    fn iterator_index(&self, index: usize) -> Option<usize> {
369        let g_index = self.data.subarray_index_from_local(index, 1);
370        g_index
371    }
372}
373
374impl<T: Dist + 'static> LocalIterator for LocalLockLocalIterMut<'static, T> {
375    type Item = &'static mut T;
376    type Array = LocalLockArray<T>;
377    fn init(&self, start_i: usize, cnt: usize, _s: Sealed) -> Self {
378        let max_i = self.data.num_elems_local();
379        // println!("init dist iter start_i: {:?} cnt {:?} end_i: {:?} max_i: {:?}",start_i,cnt, start_i+cnt,max_i);
380        LocalLockLocalIterMut {
381            data: self.data.clone(),
382            lock: self.lock.clone(),
383            cur_i: std::cmp::min(start_i, max_i),
384            end_i: std::cmp::min(start_i + cnt, max_i),
385            _marker: PhantomData,
386        }
387    }
388    fn array(&self) -> Self::Array {
389        self.data.clone()
390    }
391    fn next(&mut self) -> Option<Self::Item> {
392        if self.cur_i < self.end_i {
393            self.cur_i += 1;
394            unsafe {
395                Some(
396                    &mut *self
397                        .data
398                        .array
399                        .local_as_mut_ptr()
400                        .offset((self.cur_i - 1) as isize),
401                )
402            }
403        } else {
404            None
405        }
406    }
407    fn elems(&self, in_elems: usize) -> usize {
408        in_elems
409    }
410
411    fn advance_index(&mut self, count: usize) {
412        self.cur_i = std::cmp::min(self.cur_i + count, self.end_i);
413    }
414}
415
416impl<T: Dist + 'static> IndexedLocalIterator for LocalLockLocalIterMut<'static, T> {
417    fn iterator_index(&self, index: usize) -> Option<usize> {
418        if index < self.data.len() {
419            Some(index) //everyone at this point as calculated the actual index (cause we are local only) so just return it
420        } else {
421            None
422        }
423    }
424}
425
426impl<T: Dist> LamellarArrayIterators<T> for LocalLockArray<T> {
427    // type Array = LocalLockArray<T>;
428    type DistIter = LocalLockDistIter<'static, T>;
429    type LocalIter = LocalLockLocalIter<'static, T>;
430    type OnesidedIter = OneSidedIter<'static, T, Self>;
431
432    fn dist_iter(&self) -> Self::DistIter {
433        LocalLockDistIter {
434            data: self.clone(),
435            lock: Arc::new(Mutex::new(None)),
436            cur_i: 0,
437            end_i: 0,
438            _marker: PhantomData,
439        }
440    }
441
442    fn local_iter(&self) -> Self::LocalIter {
443        LocalLockLocalIter {
444            data: self.clone(),
445            lock: Arc::new(Mutex::new(None)),
446            cur_i: 0,
447            end_i: 0,
448            _marker: PhantomData,
449        }
450    }
451
452    fn onesided_iter(&self) -> Self::OnesidedIter {
453        OneSidedIter::new(self.clone(), self.array.team_rt(), 1)
454    }
455
456    fn buffered_onesided_iter(&self, buf_size: usize) -> Self::OnesidedIter {
457        OneSidedIter::new(
458            self.clone(),
459            self.array.team_rt(),
460            std::cmp::min(buf_size, self.len()),
461        )
462    }
463}
464
465impl<T: Dist> LamellarArrayMutIterators<T> for LocalLockArray<T> {
466    type DistIter = LocalLockDistIterMut<'static, T>;
467    type LocalIter = LocalLockLocalIterMut<'static, T>;
468
469    fn dist_iter_mut(&self) -> Self::DistIter {
470        LocalLockDistIterMut {
471            data: self.clone(),
472            lock: Arc::new(Mutex::new(None)),
473            cur_i: 0,
474            end_i: 0,
475            _marker: PhantomData,
476        }
477    }
478
479    fn local_iter_mut(&self) -> Self::LocalIter {
480        // println!("got write lock for iter");
481        LocalLockLocalIterMut {
482            data: self.clone(),
483            lock: Arc::new(Mutex::new(None)),
484            cur_i: 0,
485            end_i: 0,
486            _marker: PhantomData,
487        }
488    }
489}
490
491impl<T: Dist> DistIteratorLauncher for LocalLockArray<T> {}
492
493impl<T: Dist> LocalIteratorLauncher for LocalLockArray<T> {}