1mod handle;
4pub(crate) use handle::*;
5
6pub(crate) mod iteration;
7pub(crate) mod operations;
8mod rdma;
9use crate::array::atomic::AtomicElement;
10use crate::array::private::ArrayExecAm;
12use crate::array::r#unsafe::{UnsafeByteArray, UnsafeByteArrayWeak};
13use crate::array::*;
14use crate::barrier::BarrierHandle;
15use crate::darc::Darc;
16use crate::darc::DarcMode;
17use crate::lamellar_team::{IntoLamellarTeam, LamellarTeamRT};
18use crate::memregion::Dist;
19use crate::scheduler::LamellarTask;
20
21use parking_lot::{Mutex, MutexGuard};
22use serde::ser::SerializeSeq;
23use std::ops::{
26 AddAssign, BitAndAssign, BitOrAssign, BitXorAssign, DivAssign, MulAssign, RemAssign, ShlAssign,
27 ShrAssign, SubAssign,
28};
29
30#[doc(hidden)]
31pub struct GenericAtomicElement<T> {
32 array: GenericAtomicArray<T>,
33 local_index: usize,
34}
35
36impl<T: Dist> From<GenericAtomicElement<T>> for AtomicElement<T> {
37 fn from(element: GenericAtomicElement<T>) -> AtomicElement<T> {
38 AtomicElement::GenericAtomicElement(element)
39 }
40}
41
42impl<T: Dist> GenericAtomicElement<T> {
43 pub fn load(&self) -> T {
44 let _lock = self.array.lock_index(self.local_index);
45 unsafe { self.array.__local_as_mut_slice()[self.local_index] }
46 }
47 pub fn store(&self, val: T) {
48 let _lock = self.array.lock_index(self.local_index);
49 unsafe {
50 self.array.__local_as_mut_slice()[self.local_index] = val;
51 }
52 }
53 pub fn swap(&self, val: T) -> T {
54 let _lock = self.array.lock_index(self.local_index);
55 unsafe {
56 let old = self.array.__local_as_mut_slice()[self.local_index];
57 self.array.__local_as_mut_slice()[self.local_index] = val;
58 old
59 }
60 }
61}
62impl<T: ElementArithmeticOps> GenericAtomicElement<T> {
63 pub fn fetch_add(&self, val: T) -> T {
64 let _lock = self.array.lock_index(self.local_index);
65 unsafe {
66 let old = self.array.__local_as_mut_slice()[self.local_index];
67 self.array.__local_as_mut_slice()[self.local_index] += val;
68 old
69 }
70 }
71 pub fn fetch_sub(&self, val: T) -> T {
72 let _lock = self.array.lock_index(self.local_index);
73 unsafe {
74 let old = self.array.__local_as_mut_slice()[self.local_index];
75 self.array.__local_as_mut_slice()[self.local_index] -= val;
76 old
77 }
78 }
79 pub fn fetch_mul(&self, val: T) -> T {
80 let _lock = self.array.lock_index(self.local_index);
81 unsafe {
82 let old = self.array.__local_as_mut_slice()[self.local_index];
83 self.array.__local_as_mut_slice()[self.local_index] *= val;
84 old
85 }
86 }
87 pub fn fetch_div(&self, val: T) -> T {
88 let _lock = self.array.lock_index(self.local_index);
89 unsafe {
90 let old = self.array.__local_as_mut_slice()[self.local_index];
91 self.array.__local_as_mut_slice()[self.local_index] /= val;
92 old
93 }
94 }
95 pub fn fetch_rem(&self, val: T) -> T {
96 let _lock = self.array.lock_index(self.local_index);
97 unsafe {
98 let old = self.array.__local_as_mut_slice()[self.local_index];
99 self.array.__local_as_mut_slice()[self.local_index] %= val;
100 old
101 }
102 }
103}
104
105impl<T: Dist + std::cmp::Eq> GenericAtomicElement<T> {
106 pub fn compare_exchange(&self, current: T, new: T) -> Result<T, T> {
107 let _lock = self.array.lock_index(self.local_index);
108 let current_val = unsafe { self.array.__local_as_mut_slice()[self.local_index] };
109 if current_val == current {
110 unsafe {
111 self.array.__local_as_mut_slice()[self.local_index] = new;
112 }
113 Ok(current_val)
114 } else {
115 Err(current_val)
116 }
117 }
118}
119impl<T: Dist + std::cmp::PartialEq + std::cmp::PartialOrd + std::ops::Sub<Output = T>>
120 GenericAtomicElement<T>
121{
122 pub fn compare_exchange_epsilon(&self, current: T, new: T, eps: T) -> Result<T, T> {
123 let _lock = self.array.lock_index(self.local_index);
124 let current_val = unsafe { self.array.__local_as_mut_slice()[self.local_index] };
125 let same = if current_val > current {
126 current_val - current < eps
127 } else {
128 current - current_val < eps
129 };
130 if same {
131 unsafe {
132 self.array.__local_as_mut_slice()[self.local_index] = new;
133 }
134 Ok(current_val)
135 } else {
136 Err(current_val)
137 }
138 }
139}
140
141impl<T: ElementBitWiseOps + 'static> GenericAtomicElement<T> {
142 pub fn fetch_and(&self, val: T) -> T {
143 let _lock = self.array.lock_index(self.local_index);
144 unsafe {
145 let old = self.array.__local_as_mut_slice()[self.local_index];
146 self.array.__local_as_mut_slice()[self.local_index] &= val;
147 old
148 }
149 }
150 pub fn fetch_or(&self, val: T) -> T {
151 let _lock = self.array.lock_index(self.local_index);
152 unsafe {
153 let old = self.array.__local_as_mut_slice()[self.local_index];
154 self.array.__local_as_mut_slice()[self.local_index] |= val;
155 old
156 }
157 }
158 pub fn fetch_xor(&self, val: T) -> T {
159 let _lock = self.array.lock_index(self.local_index);
160 unsafe {
161 let old = self.array.__local_as_mut_slice()[self.local_index];
162 self.array.__local_as_mut_slice()[self.local_index] ^= val;
163 old
164 }
165 }
166}
167
168impl<T: ElementShiftOps + 'static> GenericAtomicElement<T> {
169 pub fn fetch_shl(&self, val: T) -> T {
170 let _lock = self.array.lock_index(self.local_index);
171 unsafe {
172 let old = self.array.__local_as_mut_slice()[self.local_index];
173 self.array.__local_as_mut_slice()[self.local_index] <<= val;
174 old
175 }
176 }
177 pub fn fetch_shr(&self, val: T) -> T {
178 let _lock = self.array.lock_index(self.local_index);
179 unsafe {
180 let old = self.array.__local_as_mut_slice()[self.local_index];
181 self.array.__local_as_mut_slice()[self.local_index] >>= val;
182 old
183 }
184 }
185}
186
187impl<T: Dist + ElementArithmeticOps> AddAssign<T> for GenericAtomicElement<T> {
188 fn add_assign(&mut self, val: T) {
189 let _lock = self.array.lock_index(self.local_index);
191 unsafe { self.array.__local_as_mut_slice()[self.local_index] += val }
192 }
193}
194
195impl<T: Dist + ElementArithmeticOps> SubAssign<T> for GenericAtomicElement<T> {
196 fn sub_assign(&mut self, val: T) {
197 let _lock = self.array.lock_index(self.local_index);
198 unsafe { self.array.__local_as_mut_slice()[self.local_index] -= val }
199 }
200}
201
202impl<T: Dist + ElementArithmeticOps> MulAssign<T> for GenericAtomicElement<T> {
203 fn mul_assign(&mut self, val: T) {
204 let _lock = self.array.lock_index(self.local_index);
205 unsafe { self.array.__local_as_mut_slice()[self.local_index] *= val }
206 }
207}
208
209impl<T: Dist + ElementArithmeticOps> DivAssign<T> for GenericAtomicElement<T> {
210 fn div_assign(&mut self, val: T) {
211 let _lock = self.array.lock_index(self.local_index);
212 unsafe { self.array.__local_as_mut_slice()[self.local_index] /= val }
213 }
214}
215
216impl<T: Dist + ElementArithmeticOps> RemAssign<T> for GenericAtomicElement<T> {
217 fn rem_assign(&mut self, val: T) {
218 let _lock = self.array.lock_index(self.local_index);
219 unsafe { self.array.__local_as_mut_slice()[self.local_index] %= val }
220 }
221}
222
223impl<T: Dist + ElementBitWiseOps> BitAndAssign<T> for GenericAtomicElement<T> {
224 fn bitand_assign(&mut self, val: T) {
225 let _lock = self.array.lock_index(self.local_index);
226 unsafe { self.array.__local_as_mut_slice()[self.local_index] &= val }
227 }
228}
229
230impl<T: Dist + ElementBitWiseOps> BitOrAssign<T> for GenericAtomicElement<T> {
231 fn bitor_assign(&mut self, val: T) {
232 let _lock = self.array.lock_index(self.local_index);
233 unsafe { self.array.__local_as_mut_slice()[self.local_index] |= val }
234 }
235}
236
237impl<T: Dist + ElementBitWiseOps> BitXorAssign<T> for GenericAtomicElement<T> {
238 fn bitxor_assign(&mut self, val: T) {
239 let _lock = self.array.lock_index(self.local_index);
240 unsafe { self.array.__local_as_mut_slice()[self.local_index] ^= val }
241 }
242}
243
244impl<T: Dist + ElementShiftOps> ShlAssign<T> for GenericAtomicElement<T> {
245 fn shl_assign(&mut self, val: T) {
246 let _lock = self.array.lock_index(self.local_index);
247 unsafe { self.array.__local_as_mut_slice()[self.local_index].shl_assign(val) }
248 }
249}
250
251impl<T: Dist + ElementShiftOps> ShrAssign<T> for GenericAtomicElement<T> {
252 fn shr_assign(&mut self, val: T) {
253 let _lock = self.array.lock_index(self.local_index);
254 unsafe { self.array.__local_as_mut_slice()[self.local_index].shr_assign(val) }
255 }
256}
257
258impl<T: Dist + std::fmt::Debug> std::fmt::Debug for GenericAtomicElement<T> {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 let _lock = self.array.lock_index(self.local_index);
261 let current_val = unsafe { self.array.__local_as_mut_slice()[self.local_index] };
262 write!(f, "{current_val:?}")
263 }
264}
265
266#[lamellar_impl::AmDataRT(Clone, Debug)]
272pub struct GenericAtomicArray<T> {
273 locks: Darc<Vec<Mutex<()>>>,
274 pub(crate) array: UnsafeArray<T>,
275}
276
277#[doc(hidden)]
278#[lamellar_impl::AmDataRT(Clone, Debug)]
279pub struct GenericAtomicByteArray {
280 locks: Darc<Vec<Mutex<()>>>,
281 pub(crate) array: UnsafeByteArray,
282}
283
284impl GenericAtomicByteArray {
285 pub fn lock_index(&self, index: usize) -> MutexGuard<()> {
287 let index = self
288 .array
289 .inner
290 .pe_full_offset_for_local_index(self.array.inner.data.my_pe, index)
291 .expect("invalid local index");
292 self.locks[index].lock()
293 }
294
295 pub fn downgrade(array: &GenericAtomicByteArray) -> GenericAtomicByteArrayWeak {
297 GenericAtomicByteArrayWeak {
298 locks: array.locks.clone(),
299 array: UnsafeByteArray::downgrade(&array.array),
300 }
301 }
302}
303
304#[doc(hidden)]
305#[lamellar_impl::AmLocalDataRT(Clone, Debug)]
306pub struct GenericAtomicByteArrayWeak {
307 locks: Darc<Vec<Mutex<()>>>,
308 pub(crate) array: UnsafeByteArrayWeak,
309}
310
311impl GenericAtomicByteArrayWeak {
312 pub fn upgrade(&self) -> Option<GenericAtomicByteArray> {
314 Some(GenericAtomicByteArray {
315 locks: self.locks.clone(),
316 array: self.array.upgrade()?,
317 })
318 }
319}
320
321#[doc(hidden)]
322#[derive(Clone, Debug)]
323pub struct GenericAtomicLocalData<T: Dist> {
324 pub(crate) array: GenericAtomicArray<T>,
325 start_index: usize,
326 end_index: usize,
327}
328
329#[doc(hidden)]
330#[derive(Debug)]
331pub struct GenericAtomicLocalDataIter<T: Dist> {
332 array: GenericAtomicArray<T>,
333 index: usize,
334 end_index: usize,
335}
336
337impl<T: Dist> GenericAtomicLocalData<T> {
338 pub fn at(&self, index: usize) -> GenericAtomicElement<T> {
339 GenericAtomicElement {
340 array: self.array.clone(),
341 local_index: index,
342 }
343 }
344
345 pub fn get_mut(&self, index: usize) -> Option<GenericAtomicElement<T>> {
346 Some(GenericAtomicElement {
347 array: self.array.clone(),
348 local_index: index,
349 })
350 }
351
352 pub fn len(&self) -> usize {
353 unsafe { self.array.__local_as_mut_slice().len() }
354 }
355
356 pub fn iter(&self) -> GenericAtomicLocalDataIter<T> {
357 GenericAtomicLocalDataIter {
358 array: self.array.clone(),
359 index: self.start_index,
360 end_index: self.end_index,
361 }
362 }
363
364 pub fn sub_data(&self, start_index: usize, end_index: usize) -> GenericAtomicLocalData<T> {
365 GenericAtomicLocalData {
366 array: self.array.clone(),
367 start_index: start_index,
368 end_index: std::cmp::min(end_index, self.array.num_elems_local()),
369 }
370 }
371}
372
373impl<T: Dist + serde::Serialize> serde::Serialize for GenericAtomicLocalData<T> {
374 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
375 where
376 S: serde::Serializer,
377 {
378 let mut s = serializer.serialize_seq(Some(self.len()))?;
379 for i in 0..self.len() {
380 s.serialize_element(&self.at(i).load())?;
381 }
382 s.end()
383 }
384}
385
386impl<T: Dist> IntoIterator for GenericAtomicLocalData<T> {
387 type Item = GenericAtomicElement<T>;
388 type IntoIter = GenericAtomicLocalDataIter<T>;
389 fn into_iter(self) -> Self::IntoIter {
390 GenericAtomicLocalDataIter {
391 array: self.array,
392 index: self.start_index,
393 end_index: self.end_index,
394 }
395 }
396}
397
398impl<T: Dist> Iterator for GenericAtomicLocalDataIter<T> {
399 type Item = GenericAtomicElement<T>;
400 fn next(&mut self) -> Option<Self::Item> {
401 if self.index < self.end_index {
402 let index = self.index;
403 self.index += 1;
404 Some(GenericAtomicElement {
405 array: self.array.clone(),
406 local_index: index,
407 })
408 } else {
409 None
410 }
411 }
412}
413
414impl<T: Dist + ArrayOps + std::default::Default> GenericAtomicArray<T> {
415 pub(crate) fn new<U: Clone + Into<IntoLamellarTeam>>(
416 team: U,
417 array_size: usize,
418 distribution: Distribution,
419 ) -> GenericAtomicArrayHandle<T> {
420 let team = team.into().team.clone();
423 GenericAtomicArrayHandle {
424 team: team.clone(),
425 launched: false,
426 creation_future: Box::pin(async move {
427 let array = UnsafeArray::async_new(
428 team.clone(),
429 array_size,
430 distribution,
431 DarcMode::LocalLockArray,
432 )
433 .await;
434 let mut vec = vec![];
435 for _i in 0..array.num_elems_local() {
436 vec.push(Mutex::new(()));
437 }
438 GenericAtomicArray {
439 locks: Darc::new(team, vec).await.expect("pe exists in team"),
440 array,
441 }
442 }),
443 }
444 }
445}
446
447impl<T: Dist> GenericAtomicArray<T> {
448 pub(crate) fn get_element(&self, index: usize) -> Option<GenericAtomicElement<T>> {
449 if index < unsafe { self.__local_as_slice().len() } {
450 Some(GenericAtomicElement {
452 array: self.clone(),
453 local_index: index,
454 })
455 } else {
456 None
457 }
458 }
459}
460#[doc(hidden)]
461impl<T: Dist> GenericAtomicArray<T> {
462 pub fn use_distribution(self, distribution: Distribution) -> Self {
464 GenericAtomicArray {
465 locks: self.locks.clone(),
466 array: self.array.use_distribution(distribution),
467 }
468 }
469
470 pub fn local_data(&self) -> GenericAtomicLocalData<T> {
472 GenericAtomicLocalData {
473 array: self.clone(),
474 start_index: 0,
475 end_index: self.array.num_elems_local(),
476 }
477 }
478
479 pub fn mut_local_data(&self) -> GenericAtomicLocalData<T> {
481 GenericAtomicLocalData {
482 array: self.clone(),
483 start_index: 0,
484 end_index: self.array.num_elems_local(),
485 }
486 }
487
488 pub unsafe fn __local_as_slice(&self) -> &[T] {
490 self.array.local_as_mut_slice()
491 }
492 pub unsafe fn __local_as_mut_slice(&self) -> &mut [T] {
494 self.array.local_as_mut_slice()
495 }
496
497 pub fn into_unsafe(self) -> IntoUnsafeArrayHandle<T> {
499 IntoUnsafeArrayHandle {
502 team: self.array.inner.data.team.clone(),
503 launched: false,
504 outstanding_future: Box::pin(self.async_into()),
505 }
506 }
507
508 pub fn into_read_only(self) -> IntoReadOnlyArrayHandle<T> {
510 self.array.into_read_only()
512 }
513
514 pub fn into_local_lock(self) -> IntoLocalLockArrayHandle<T> {
516 self.array.into_local_lock()
518 }
519
520 pub fn into_global_lock(self) -> IntoGlobalLockArrayHandle<T> {
522 self.array.into_global_lock()
524 }
525
526 pub fn lock_index(&self, index: usize) -> MutexGuard<()> {
528 let index = self
541 .array
542 .inner
543 .pe_full_offset_for_local_index(self.array.inner.data.my_pe, index)
544 .expect("invalid local index");
545 self.locks[index].lock()
546 }
547}
548
549impl<T: Dist + 'static> GenericAtomicArray<T> {
550 #[doc(hidden)]
551 pub fn into_atomic(self) -> IntoAtomicArrayHandle<T> {
552 self.array.into_atomic()
554 }
555}
556
557impl<T: Dist + ArrayOps> AsyncTeamFrom<(Vec<T>, Distribution)> for GenericAtomicArray<T> {
559 async fn team_from(input: (Vec<T>, Distribution), team: &Arc<LamellarTeam>) -> Self {
560 let array: UnsafeArray<T> = AsyncTeamInto::team_into(input, team).await;
561 array.async_into().await
562 }
563}
564
565#[async_trait]
566impl<T: Dist> AsyncFrom<UnsafeArray<T>> for GenericAtomicArray<T> {
567 async fn async_from(array: UnsafeArray<T>) -> Self {
568 array
570 .await_on_outstanding(DarcMode::GenericAtomicArray)
571 .await;
572 let mut vec = vec![];
573 for _i in 0..array.num_elems_local() {
574 vec.push(Mutex::new(()));
575 }
576 let locks = Darc::new(array.team_rt(), vec).await.expect("PE in team");
577
578 GenericAtomicArray {
579 locks: locks,
580 array: array,
581 }
582 }
583}
584
585impl<T: Dist> From<GenericAtomicArray<T>> for GenericAtomicByteArray {
586 fn from(array: GenericAtomicArray<T>) -> Self {
587 GenericAtomicByteArray {
588 locks: array.locks.clone(),
589 array: array.array.into(),
590 }
591 }
592}
593
594impl<T: Dist> From<GenericAtomicArray<T>> for LamellarByteArray {
595 fn from(array: GenericAtomicArray<T>) -> Self {
596 LamellarByteArray::GenericAtomicArray(GenericAtomicByteArray {
597 locks: array.locks.clone(),
598 array: array.array.into(),
599 })
600 }
601}
602
603impl<T: Dist> From<LamellarByteArray> for GenericAtomicArray<T> {
604 fn from(array: LamellarByteArray) -> Self {
605 if let LamellarByteArray::GenericAtomicArray(array) = array {
606 array.into()
607 } else {
608 panic!("Expected LamellarByteArray::GenericAtomicArray")
609 }
610 }
611}
612
613impl<T: Dist> From<GenericAtomicArray<T>> for AtomicByteArray {
614 fn from(array: GenericAtomicArray<T>) -> Self {
615 AtomicByteArray::GenericAtomicByteArray(GenericAtomicByteArray {
616 locks: array.locks.clone(),
617 array: array.array.into(),
618 })
619 }
620}
621impl<T: Dist> From<GenericAtomicByteArray> for GenericAtomicArray<T> {
622 fn from(array: GenericAtomicByteArray) -> Self {
623 GenericAtomicArray {
624 locks: array.locks.clone(),
625 array: array.array.into(),
626 }
627 }
628}
629impl<T: Dist> From<GenericAtomicByteArray> for AtomicArray<T> {
630 fn from(array: GenericAtomicByteArray) -> Self {
631 GenericAtomicArray {
632 locks: array.locks.clone(),
633 array: array.array.into(),
634 }
635 .into()
636 }
637}
638
639impl<T: Dist> private::ArrayExecAm<T> for GenericAtomicArray<T> {
640 fn team_rt(&self) -> Pin<Arc<LamellarTeamRT>> {
641 self.array.team_rt()
642 }
643 fn team_counters(&self) -> Arc<AMCounters> {
644 self.array.team_counters()
645 }
646}
647
648impl<T: Dist> private::LamellarArrayPrivate<T> for GenericAtomicArray<T> {
649 fn inner_array(&self) -> &UnsafeArray<T> {
650 &self.array
651 }
652 fn local_as_ptr(&self) -> *const T {
653 self.array.local_as_mut_ptr()
654 }
655 fn local_as_mut_ptr(&self) -> *mut T {
656 self.array.local_as_mut_ptr()
657 }
658 fn pe_for_dist_index(&self, index: usize) -> Option<usize> {
659 self.array.pe_for_dist_index(index)
660 }
661 fn pe_offset_for_dist_index(&self, pe: usize, index: usize) -> Option<usize> {
662 self.array.pe_offset_for_dist_index(pe, index)
663 }
664 unsafe fn into_inner(self) -> UnsafeArray<T> {
665 self.array
666 }
667 fn as_lamellar_byte_array(&self) -> LamellarByteArray {
668 self.clone().into()
669 }
670}
671
672impl<T: Dist> ActiveMessaging for GenericAtomicArray<T> {
673 type SinglePeAmHandle<R: AmDist> = AmHandle<R>;
674 type MultiAmHandle<R: AmDist> = MultiAmHandle<R>;
675 type LocalAmHandle<L> = LocalAmHandle<L>;
676 fn exec_am_all<F>(&self, am: F) -> Self::MultiAmHandle<F::Output>
677 where
678 F: RemoteActiveMessage + LamellarAM + Serde + AmDist,
679 {
680 self.array.exec_am_all_tg(am)
681 }
682 fn exec_am_pe<F>(&self, pe: usize, am: F) -> Self::SinglePeAmHandle<F::Output>
683 where
684 F: RemoteActiveMessage + LamellarAM + Serde + AmDist,
685 {
686 self.array.exec_am_pe_tg(pe, am)
687 }
688 fn exec_am_local<F>(&self, am: F) -> Self::LocalAmHandle<F::Output>
689 where
690 F: LamellarActiveMessage + LocalAM + 'static,
691 {
692 self.array.exec_am_local_tg(am)
693 }
694 fn wait_all(&self) {
695 self.array.wait_all()
696 }
697 fn await_all(&self) -> impl Future<Output = ()> + Send {
698 self.array.await_all()
699 }
700 fn barrier(&self) {
701 self.array.barrier()
702 }
703 fn async_barrier(&self) -> BarrierHandle {
704 self.array.async_barrier()
705 }
706 fn spawn<F: Future>(&self, f: F) -> LamellarTask<F::Output>
707 where
708 F: Future + Send + 'static,
709 F::Output: Send,
710 {
711 self.array.spawn(f)
712 }
713 fn block_on<F: Future>(&self, f: F) -> F::Output {
714 self.array.block_on(f)
715 }
716 fn block_on_all<I>(&self, iter: I) -> Vec<<<I as IntoIterator>::Item as Future>::Output>
717 where
718 I: IntoIterator,
719 <I as IntoIterator>::Item: Future + Send + 'static,
720 <<I as IntoIterator>::Item as Future>::Output: Send,
721 {
722 self.array.block_on_all(iter)
723 }
724}
725
726impl<T: Dist> LamellarArray<T> for GenericAtomicArray<T> {
727 fn len(&self) -> usize {
737 self.array.len()
738 }
739 fn num_elems_local(&self) -> usize {
740 self.array.num_elems_local()
741 }
742 fn pe_and_offset_for_global_index(&self, index: usize) -> Option<(usize, usize)> {
754 self.array.pe_and_offset_for_global_index(index)
755 }
756 fn first_global_index_for_pe(&self, pe: usize) -> Option<usize> {
757 self.array.first_global_index_for_pe(pe)
758 }
759
760 fn last_global_index_for_pe(&self, pe: usize) -> Option<usize> {
761 self.array.last_global_index_for_pe(pe)
762 }
763}
764
765impl<T: Dist> LamellarEnv for GenericAtomicArray<T> {
766 fn my_pe(&self) -> usize {
767 LamellarEnv::my_pe(&self.array)
768 }
769
770 fn num_pes(&self) -> usize {
771 LamellarEnv::num_pes(&self.array)
772 }
773 fn num_threads_per_pe(&self) -> usize {
774 self.array.team_rt().num_threads()
775 }
776 fn world(&self) -> Arc<LamellarTeam> {
777 self.array.team_rt().world()
778 }
779 fn team(&self) -> Arc<LamellarTeam> {
780 self.array.team_rt().team()
781 }
782}
783
784impl<T: Dist> LamellarWrite for GenericAtomicArray<T> {}
785impl<T: Dist> LamellarRead for GenericAtomicArray<T> {}
786
787impl<T: Dist> SubArray<T> for GenericAtomicArray<T> {
788 type Array = GenericAtomicArray<T>;
789 fn sub_array<R: std::ops::RangeBounds<usize>>(&self, range: R) -> Self::Array {
790 GenericAtomicArray {
791 locks: self.locks.clone(),
792 array: self.array.sub_array(range),
793 }
794 }
795 fn global_index(&self, sub_index: usize) -> usize {
796 self.array.global_index(sub_index)
797 }
798}
799
800impl<T: Dist + std::fmt::Debug> GenericAtomicArray<T> {
801 #[doc(alias = "Collective")]
802 pub fn print(&self) {
819 self.array.print();
820 }
821}
822
823impl<T: Dist + std::fmt::Debug> ArrayPrint<T> for GenericAtomicArray<T> {
824 fn print(&self) {
825 self.array.print()
826 }
827}
828
829impl<T: Dist + AmDist + 'static> GenericAtomicArray<T> {
830 #[doc(hidden)]
831 pub fn reduce(&self, op: &str) -> AmHandle<Option<T>> {
832 self.array.reduce_data(op, self.clone().into())
833 }
834}
835impl<T: Dist + AmDist + ElementArithmeticOps + 'static> GenericAtomicArray<T> {
836 #[doc(hidden)]
837 pub fn sum(&self) -> AmHandle<Option<T>> {
838 self.reduce("sum")
839 }
840 #[doc(hidden)]
841 pub fn prod(&self) -> AmHandle<Option<T>> {
842 self.reduce("prod")
843 }
844}
845impl<T: Dist + AmDist + ElementComparePartialEqOps + 'static> GenericAtomicArray<T> {
846 #[doc(hidden)]
847 pub fn max(&self) -> AmHandle<Option<T>> {
848 self.reduce("max")
849 }
850 #[doc(hidden)]
851 pub fn min(&self) -> AmHandle<Option<T>> {
852 self.reduce("min")
853 }
854}
855
856#[doc(hidden)]
857pub struct LocalGenericAtomicElement<T> {
858 pub(crate) val: Mutex<T>,
859}
860
861impl<T: Dist> From<LocalGenericAtomicElement<T>> for AtomicElement<T> {
862 fn from(element: LocalGenericAtomicElement<T>) -> AtomicElement<T> {
863 AtomicElement::LocalGenericAtomicElement(element)
864 }
865}
866
867impl<T: Dist> LocalGenericAtomicElement<T> {
868 pub fn load(&self) -> T {
869 *self.val.lock()
870 }
871 pub fn store(&self, val: T) {
872 *self.val.lock() = val
873 }
874 pub fn swap(&self, val: T) -> T {
875 let old = *self.val.lock();
876 *self.val.lock() = val;
877 old
878 }
879}
880impl<T: ElementArithmeticOps> LocalGenericAtomicElement<T> {
881 pub fn fetch_add(&self, val: T) -> T {
882 let old = *self.val.lock();
883 *self.val.lock() += val;
884 old
885 }
886 pub fn fetch_sub(&self, val: T) -> T {
887 let old = *self.val.lock();
888 *self.val.lock() -= val;
889 old
890 }
891 pub fn fetch_mul(&self, val: T) -> T {
892 let old = *self.val.lock();
893 *self.val.lock() *= val;
894 old
895 }
896 pub fn fetch_div(&self, val: T) -> T {
897 let old = *self.val.lock();
898 *self.val.lock() /= val;
899 old
900 }
901 pub fn fetch_rem(&self, val: T) -> T {
902 let old = *self.val.lock();
903 *self.val.lock() %= val;
904 old
905 }
906}
907
908impl<T: Dist + std::cmp::Eq> LocalGenericAtomicElement<T> {
909 pub fn compare_exchange(&self, current: T, new: T) -> Result<T, T> {
910 let current_val = *self.val.lock();
911 if current_val == current {
912 *self.val.lock() = new;
913
914 Ok(current_val)
915 } else {
916 Err(current_val)
917 }
918 }
919}
920impl<T: Dist + std::cmp::PartialEq + std::cmp::PartialOrd + std::ops::Sub<Output = T>>
921 LocalGenericAtomicElement<T>
922{
923 pub fn compare_exchange_epsilon(&self, current: T, new: T, eps: T) -> Result<T, T> {
924 let current_val = *self.val.lock();
925 let same = if current_val > current {
926 current_val - current < eps
927 } else {
928 current - current_val < eps
929 };
930 if same {
931 *self.val.lock() = new;
932
933 Ok(current_val)
934 } else {
935 Err(current_val)
936 }
937 }
938}
939
940impl<T: ElementBitWiseOps + 'static> LocalGenericAtomicElement<T> {
941 pub fn fetch_and(&self, val: T) -> T {
942 let old = *self.val.lock();
943 *self.val.lock() &= val;
944 old
945 }
946 pub fn fetch_or(&self, val: T) -> T {
947 let old = *self.val.lock();
948 *self.val.lock() |= val;
949 old
950 }
951 pub fn fetch_xor(&self, val: T) -> T {
952 let old = *self.val.lock();
953 *self.val.lock() ^= val;
954 old
955 }
956}
957
958impl<T: ElementShiftOps + 'static> LocalGenericAtomicElement<T> {
959 pub fn fetch_shl(&self, val: T) -> T {
960 let old = *self.val.lock();
961 *self.val.lock() <<= val;
962 old
963 }
964 pub fn fetch_shr(&self, val: T) -> T {
965 let old = *self.val.lock();
966 *self.val.lock() >>= val;
967 old
968 }
969}
970
971impl<T: Dist + ElementArithmeticOps> AddAssign<T> for LocalGenericAtomicElement<T> {
972 fn add_assign(&mut self, val: T) {
973 *self.val.lock() += val
975 }
976}
977
978impl<T: Dist + ElementArithmeticOps> SubAssign<T> for LocalGenericAtomicElement<T> {
979 fn sub_assign(&mut self, val: T) {
980 *self.val.lock() -= val
981 }
982}
983
984impl<T: Dist + ElementArithmeticOps> MulAssign<T> for LocalGenericAtomicElement<T> {
985 fn mul_assign(&mut self, val: T) {
986 *self.val.lock() *= val
987 }
988}
989
990impl<T: Dist + ElementArithmeticOps> DivAssign<T> for LocalGenericAtomicElement<T> {
991 fn div_assign(&mut self, val: T) {
992 *self.val.lock() /= val
993 }
994}
995
996impl<T: Dist + ElementArithmeticOps> RemAssign<T> for LocalGenericAtomicElement<T> {
997 fn rem_assign(&mut self, val: T) {
998 *self.val.lock() %= val
999 }
1000}
1001
1002impl<T: Dist + ElementBitWiseOps> BitAndAssign<T> for LocalGenericAtomicElement<T> {
1003 fn bitand_assign(&mut self, val: T) {
1004 *self.val.lock() &= val
1005 }
1006}
1007
1008impl<T: Dist + ElementBitWiseOps> BitOrAssign<T> for LocalGenericAtomicElement<T> {
1009 fn bitor_assign(&mut self, val: T) {
1010 *self.val.lock() |= val
1011 }
1012}
1013
1014impl<T: Dist + ElementBitWiseOps> BitXorAssign<T> for LocalGenericAtomicElement<T> {
1015 fn bitxor_assign(&mut self, val: T) {
1016 *self.val.lock() ^= val
1017 }
1018}
1019
1020impl<T: Dist + ElementShiftOps> ShlAssign<T> for LocalGenericAtomicElement<T> {
1021 fn shl_assign(&mut self, val: T) {
1022 self.val.lock().shl_assign(val)
1023 }
1024}
1025
1026impl<T: Dist + ElementShiftOps> ShrAssign<T> for LocalGenericAtomicElement<T> {
1027 fn shr_assign(&mut self, val: T) {
1028 self.val.lock().shr_assign(val)
1029 }
1030}
1031
1032impl<T: Dist + std::fmt::Debug> std::fmt::Debug for LocalGenericAtomicElement<T> {
1033 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1034 let current_val = *self.val.lock();
1035 write!(f, "{current_val:?}")
1036 }
1037}