1use parking_lot::Mutex;
2
3use crate::array::iterator::distributed_iterator::*;
4use crate::array::iterator::local_iterator::*;
5use crate::array::iterator::one_sided_iterator::OneSidedIter;
6use crate::array::iterator::{private::*, LamellarArrayIterators, LamellarArrayMutIterators};
7use crate::array::local_lock_atomic::*;
8use crate::array::private::LamellarArrayPrivate;
9use crate::array::r#unsafe::private::UnsafeArrayInner;
10use crate::array::*;
11use crate::darc::local_rw_darc::LocalRwDarcWriteGuard;
12use crate::memregion::Dist;
13
14use self::iterator::IterLockFuture;
15
16impl<T> InnerArray for LocalLockArray<T> {
17 fn as_inner(&self) -> &UnsafeArrayInner {
18 &self.array.inner
19 }
20}
21
22#[derive(Clone)]
24pub struct LocalLockDistIter<'a, T: Dist> {
25 data: LocalLockArray<T>,
26 lock: Arc<Mutex<Option<LocalRwDarcReadGuard<()>>>>,
27 cur_i: usize,
28 end_i: usize,
29 _marker: PhantomData<&'a T>,
30}
31
32impl<'a, T: Dist> InnerIter for LocalLockDistIter<'a, T> {
33 fn lock_if_needed(&self, _s: Sealed) -> Option<IterLockFuture> {
34 if self.lock.lock().is_none() {
39 let lock_handle = self.data.lock.read();
41 let lock = self.lock.clone();
42
43 Some(Box::pin(async move {
44 *lock.lock() = Some(lock_handle.await);
46 }))
48 } else {
49 None
50 }
51 }
52 fn iter_clone(&self, _s: Sealed) -> Self {
53 LocalLockDistIter {
54 data: self.data.clone(),
55 lock: self.lock.clone(),
56 cur_i: self.cur_i,
57 end_i: self.end_i,
58 _marker: PhantomData,
59 }
60 }
61}
62
63impl<'a, T: Dist> std::fmt::Debug for LocalLockDistIter<'a, T> {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(
66 f,
67 "LocalLockDistIter{{ data.len: {:?}, cur_i: {:?}, end_i: {:?} }}",
68 self.data.len(),
69 self.cur_i,
70 self.end_i
71 )
72 }
73}
74
75#[derive(Clone)]
77pub struct LocalLockLocalIter<'a, T: Dist> {
78 data: LocalLockArray<T>,
79 lock: Arc<Mutex<Option<LocalRwDarcReadGuard<()>>>>,
80 cur_i: usize,
81 end_i: usize,
82 _marker: PhantomData<&'a T>,
83}
84
85impl<'a, T: Dist> InnerIter for LocalLockLocalIter<'a, T> {
86 fn lock_if_needed(&self, _s: Sealed) -> Option<IterLockFuture> {
87 if self.lock.lock().is_none() {
92 let lock_handle = self.data.lock.read();
94 let lock = self.lock.clone();
95
96 Some(Box::pin(async move {
97 *lock.lock() = Some(lock_handle.await);
99 }))
101 } else {
102 None
103 }
104 }
105 fn iter_clone(&self, _s: Sealed) -> Self {
106 LocalLockLocalIter {
107 data: self.data.clone(),
108 lock: self.lock.clone(),
109 cur_i: self.cur_i,
110 end_i: self.end_i,
111 _marker: PhantomData,
112 }
113 }
114}
115
116impl<'a, T: Dist> std::fmt::Debug for LocalLockLocalIter<'a, T> {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 write!(
119 f,
120 "LocalLockLocalIter{{ data.len: {:?}, cur_i: {:?}, end_i: {:?} }}",
121 self.data.len(),
122 self.cur_i,
123 self.end_i
124 )
125 }
126}
127
128impl<T: Dist + 'static> DistributedIterator for LocalLockDistIter<'static, T> {
129 type Item = &'static T;
130 type Array = LocalLockArray<T>;
131 fn init(&self, start_i: usize, cnt: usize, _s: Sealed) -> Self {
132 let max_i = self.data.num_elems_local();
133 LocalLockDistIter {
135 data: self.data.clone(),
136 lock: self.lock.clone(),
137 cur_i: std::cmp::min(start_i, max_i),
138 end_i: std::cmp::min(start_i + cnt, max_i),
139 _marker: PhantomData,
140 }
141 }
142 fn array(&self) -> Self::Array {
143 self.data.clone()
144 }
145 fn next(&mut self) -> Option<Self::Item> {
146 if self.cur_i < self.end_i {
147 self.cur_i += 1;
148 unsafe {
149 self.data
150 .array
151 .local_as_ptr()
152 .offset((self.cur_i - 1) as isize)
153 .as_ref()
154 }
155 } else {
156 None
157 }
158 }
159 fn elems(&self, in_elems: usize) -> usize {
160 in_elems
161 }
162 fn advance_index(&mut self, count: usize) {
163 self.cur_i = std::cmp::min(self.cur_i + count, self.end_i);
164 }
165}
166impl<T: Dist + 'static> IndexedDistributedIterator for LocalLockDistIter<'static, T> {
167 fn iterator_index(&self, index: usize) -> Option<usize> {
168 let g_index = self.data.subarray_index_from_local(index, 1);
169 g_index
170 }
171}
172
173impl<T: Dist + 'static> LocalIterator for LocalLockLocalIter<'static, T> {
174 type Item = &'static T;
175 type Array = LocalLockArray<T>;
176 fn init(&self, start_i: usize, cnt: usize, _s: Sealed) -> Self {
177 let max_i = self.data.num_elems_local();
178 LocalLockLocalIter {
180 data: self.data.clone(),
181 lock: self.lock.clone(),
182 cur_i: std::cmp::min(start_i, max_i),
183 end_i: std::cmp::min(start_i + cnt, max_i),
184 _marker: PhantomData,
185 }
186 }
187 fn array(&self) -> Self::Array {
188 self.data.clone()
189 }
190 fn next(&mut self) -> Option<Self::Item> {
191 if self.cur_i < self.end_i {
192 self.cur_i += 1;
193 unsafe {
194 self.data
195 .array
196 .local_as_ptr()
197 .offset((self.cur_i - 1) as isize)
198 .as_ref()
199 }
200 } else {
201 None
202 }
203 }
204 fn elems(&self, in_elems: usize) -> usize {
205 in_elems
206 }
207
208 fn advance_index(&mut self, count: usize) {
209 self.cur_i = std::cmp::min(self.cur_i + count, self.end_i);
210 }
211}
212
213impl<T: Dist + 'static> IndexedLocalIterator for LocalLockLocalIter<'static, T> {
214 fn iterator_index(&self, index: usize) -> Option<usize> {
215 if index < self.data.len() {
216 Some(index) } else {
218 None
219 }
220 }
221}
222
223pub struct LocalLockDistIterMut<'a, T: Dist> {
224 data: LocalLockArray<T>,
225 lock: Arc<Mutex<Option<LocalRwDarcWriteGuard<()>>>>,
226 cur_i: usize,
227 end_i: usize,
228 _marker: PhantomData<&'a T>,
229}
230
231impl<'a, T: Dist> InnerIter for LocalLockDistIterMut<'a, T> {
232 fn lock_if_needed(&self, _s: Sealed) -> Option<IterLockFuture> {
233 if self.lock.lock().is_none() {
238 let lock_handle = self.data.lock.write();
240 let lock = self.lock.clone();
241
242 Some(Box::pin(async move {
243 *lock.lock() = Some(lock_handle.await);
245 }))
247 } else {
248 None
249 }
250 }
251 fn iter_clone(&self, _s: Sealed) -> Self {
252 LocalLockDistIterMut {
253 data: self.data.clone(),
254 lock: self.lock.clone(),
255 cur_i: self.cur_i,
256 end_i: self.end_i,
257 _marker: PhantomData,
258 }
259 }
260}
261
262impl<'a, T: Dist> std::fmt::Debug for LocalLockDistIterMut<'a, T> {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 write!(
265 f,
266 "LocalLockDistIterMut{{ data.len: {:?}, cur_i: {:?}, end_i: {:?} }}",
267 self.data.len(),
268 self.cur_i,
269 self.end_i
270 )
271 }
272}
273
274pub struct LocalLockLocalIterMut<'a, T: Dist> {
275 data: LocalLockArray<T>,
276 lock: Arc<Mutex<Option<LocalRwDarcWriteGuard<()>>>>,
277 cur_i: usize,
278 end_i: usize,
279 _marker: PhantomData<&'a T>,
280}
281
282impl<'a, T: Dist> InnerIter for LocalLockLocalIterMut<'a, T> {
283 fn lock_if_needed(&self, _s: Sealed) -> Option<IterLockFuture> {
284 if self.lock.lock().is_none() {
289 let lock_handle = self.data.lock.write();
291 let lock = self.lock.clone();
292
293 Some(Box::pin(async move {
294 *lock.lock() = Some(lock_handle.await);
296 }))
298 } else {
299 None
300 }
301 }
302 fn iter_clone(&self, _s: Sealed) -> Self {
303 LocalLockLocalIterMut {
304 data: self.data.clone(),
305 lock: self.lock.clone(),
306 cur_i: self.cur_i,
307 end_i: self.end_i,
308 _marker: PhantomData,
309 }
310 }
311}
312
313impl<'a, T: Dist> std::fmt::Debug for LocalLockLocalIterMut<'a, T> {
314 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
315 write!(
316 f,
317 "LocalLockLocalIterMut{{ data.len: {:?}, cur_i: {:?}, end_i: {:?} }}",
318 self.data.len(),
319 self.cur_i,
320 self.end_i
321 )
322 }
323}
324
325impl<T: Dist + 'static> DistributedIterator for LocalLockDistIterMut<'static, T> {
326 type Item = &'static mut T;
327 type Array = LocalLockArray<T>;
328 fn init(&self, start_i: usize, cnt: usize, _s: Sealed) -> Self {
329 let max_i = self.data.num_elems_local();
330 LocalLockDistIterMut {
332 data: self.data.clone(),
333 lock: self.lock.clone(),
334 cur_i: std::cmp::min(start_i, max_i),
335 end_i: std::cmp::min(start_i + cnt, max_i),
336 _marker: PhantomData,
337 }
338 }
339 fn array(&self) -> Self::Array {
340 self.data.clone()
341 }
342 fn next(&mut self) -> Option<Self::Item> {
343 if self.cur_i < self.end_i {
344 self.cur_i += 1;
345 unsafe {
346 Some(
347 &mut *self
348 .data
349 .array
350 .local_as_mut_ptr()
351 .offset((self.cur_i - 1) as isize),
352 )
353 }
354 } else {
355 None
356 }
357 }
358 fn elems(&self, in_elems: usize) -> usize {
359 in_elems
360 }
361
362 fn advance_index(&mut self, count: usize) {
363 self.cur_i = std::cmp::min(self.cur_i + count, self.end_i);
364 }
365}
366
367impl<T: Dist + 'static> IndexedDistributedIterator for LocalLockDistIterMut<'static, T> {
368 fn iterator_index(&self, index: usize) -> Option<usize> {
369 let g_index = self.data.subarray_index_from_local(index, 1);
370 g_index
371 }
372}
373
374impl<T: Dist + 'static> LocalIterator for LocalLockLocalIterMut<'static, T> {
375 type Item = &'static mut T;
376 type Array = LocalLockArray<T>;
377 fn init(&self, start_i: usize, cnt: usize, _s: Sealed) -> Self {
378 let max_i = self.data.num_elems_local();
379 LocalLockLocalIterMut {
381 data: self.data.clone(),
382 lock: self.lock.clone(),
383 cur_i: std::cmp::min(start_i, max_i),
384 end_i: std::cmp::min(start_i + cnt, max_i),
385 _marker: PhantomData,
386 }
387 }
388 fn array(&self) -> Self::Array {
389 self.data.clone()
390 }
391 fn next(&mut self) -> Option<Self::Item> {
392 if self.cur_i < self.end_i {
393 self.cur_i += 1;
394 unsafe {
395 Some(
396 &mut *self
397 .data
398 .array
399 .local_as_mut_ptr()
400 .offset((self.cur_i - 1) as isize),
401 )
402 }
403 } else {
404 None
405 }
406 }
407 fn elems(&self, in_elems: usize) -> usize {
408 in_elems
409 }
410
411 fn advance_index(&mut self, count: usize) {
412 self.cur_i = std::cmp::min(self.cur_i + count, self.end_i);
413 }
414}
415
416impl<T: Dist + 'static> IndexedLocalIterator for LocalLockLocalIterMut<'static, T> {
417 fn iterator_index(&self, index: usize) -> Option<usize> {
418 if index < self.data.len() {
419 Some(index) } else {
421 None
422 }
423 }
424}
425
426impl<T: Dist> LamellarArrayIterators<T> for LocalLockArray<T> {
427 type DistIter = LocalLockDistIter<'static, T>;
429 type LocalIter = LocalLockLocalIter<'static, T>;
430 type OnesidedIter = OneSidedIter<'static, T, Self>;
431
432 fn dist_iter(&self) -> Self::DistIter {
433 LocalLockDistIter {
434 data: self.clone(),
435 lock: Arc::new(Mutex::new(None)),
436 cur_i: 0,
437 end_i: 0,
438 _marker: PhantomData,
439 }
440 }
441
442 fn local_iter(&self) -> Self::LocalIter {
443 LocalLockLocalIter {
444 data: self.clone(),
445 lock: Arc::new(Mutex::new(None)),
446 cur_i: 0,
447 end_i: 0,
448 _marker: PhantomData,
449 }
450 }
451
452 fn onesided_iter(&self) -> Self::OnesidedIter {
453 OneSidedIter::new(self.clone(), self.array.team_rt(), 1)
454 }
455
456 fn buffered_onesided_iter(&self, buf_size: usize) -> Self::OnesidedIter {
457 OneSidedIter::new(
458 self.clone(),
459 self.array.team_rt(),
460 std::cmp::min(buf_size, self.len()),
461 )
462 }
463}
464
465impl<T: Dist> LamellarArrayMutIterators<T> for LocalLockArray<T> {
466 type DistIter = LocalLockDistIterMut<'static, T>;
467 type LocalIter = LocalLockLocalIterMut<'static, T>;
468
469 fn dist_iter_mut(&self) -> Self::DistIter {
470 LocalLockDistIterMut {
471 data: self.clone(),
472 lock: Arc::new(Mutex::new(None)),
473 cur_i: 0,
474 end_i: 0,
475 _marker: PhantomData,
476 }
477 }
478
479 fn local_iter_mut(&self) -> Self::LocalIter {
480 LocalLockLocalIterMut {
482 data: self.clone(),
483 lock: Arc::new(Mutex::new(None)),
484 cur_i: 0,
485 end_i: 0,
486 _marker: PhantomData,
487 }
488 }
489}
490
491impl<T: Dist> DistIteratorLauncher for LocalLockArray<T> {}
492
493impl<T: Dist> LocalIteratorLauncher for LocalLockArray<T> {}