lamellar/array/
generic_atomic.rs

1//! This module defines the `GenericAtomicArray` type, which provides atomic access to elements of an array for any type that implements the `Dist` trait through the use of element-wise mutexes.
2
3mod handle;
4pub(crate) use handle::*;
5
6pub(crate) mod iteration;
7pub(crate) mod operations;
8mod rdma;
9use crate::array::atomic::AtomicElement;
10// use crate::array::private::LamellarArrayPrivate;
11use crate::array::private::ArrayExecAm;
12use crate::array::r#unsafe::{UnsafeByteArray, UnsafeByteArrayWeak};
13use crate::array::*;
14use crate::barrier::BarrierHandle;
15use crate::darc::Darc;
16use crate::darc::DarcMode;
17use crate::lamellar_team::{IntoLamellarTeam, LamellarTeamRT};
18use crate::memregion::Dist;
19use crate::scheduler::LamellarTask;
20
21use parking_lot::{Mutex, MutexGuard};
22use serde::ser::SerializeSeq;
23// use std::ops::{Deref, DerefMut};
24
25use std::ops::{
26    AddAssign, BitAndAssign, BitOrAssign, BitXorAssign, DivAssign, MulAssign, RemAssign, ShlAssign,
27    ShrAssign, SubAssign,
28};
29
30#[doc(hidden)]
31pub struct GenericAtomicElement<T> {
32    array: GenericAtomicArray<T>,
33    local_index: usize,
34}
35
36impl<T: Dist> From<GenericAtomicElement<T>> for AtomicElement<T> {
37    fn from(element: GenericAtomicElement<T>) -> AtomicElement<T> {
38        AtomicElement::GenericAtomicElement(element)
39    }
40}
41
42impl<T: Dist> GenericAtomicElement<T> {
43    pub fn load(&self) -> T {
44        let _lock = self.array.lock_index(self.local_index);
45        unsafe { self.array.__local_as_mut_slice()[self.local_index] }
46    }
47    pub fn store(&self, val: T) {
48        let _lock = self.array.lock_index(self.local_index);
49        unsafe {
50            self.array.__local_as_mut_slice()[self.local_index] = val;
51        }
52    }
53    pub fn swap(&self, val: T) -> T {
54        let _lock = self.array.lock_index(self.local_index);
55        unsafe {
56            let old = self.array.__local_as_mut_slice()[self.local_index];
57            self.array.__local_as_mut_slice()[self.local_index] = val;
58            old
59        }
60    }
61}
62impl<T: ElementArithmeticOps> GenericAtomicElement<T> {
63    pub fn fetch_add(&self, val: T) -> T {
64        let _lock = self.array.lock_index(self.local_index);
65        unsafe {
66            let old = self.array.__local_as_mut_slice()[self.local_index];
67            self.array.__local_as_mut_slice()[self.local_index] += val;
68            old
69        }
70    }
71    pub fn fetch_sub(&self, val: T) -> T {
72        let _lock = self.array.lock_index(self.local_index);
73        unsafe {
74            let old = self.array.__local_as_mut_slice()[self.local_index];
75            self.array.__local_as_mut_slice()[self.local_index] -= val;
76            old
77        }
78    }
79    pub fn fetch_mul(&self, val: T) -> T {
80        let _lock = self.array.lock_index(self.local_index);
81        unsafe {
82            let old = self.array.__local_as_mut_slice()[self.local_index];
83            self.array.__local_as_mut_slice()[self.local_index] *= val;
84            old
85        }
86    }
87    pub fn fetch_div(&self, val: T) -> T {
88        let _lock = self.array.lock_index(self.local_index);
89        unsafe {
90            let old = self.array.__local_as_mut_slice()[self.local_index];
91            self.array.__local_as_mut_slice()[self.local_index] /= val;
92            old
93        }
94    }
95    pub fn fetch_rem(&self, val: T) -> T {
96        let _lock = self.array.lock_index(self.local_index);
97        unsafe {
98            let old = self.array.__local_as_mut_slice()[self.local_index];
99            self.array.__local_as_mut_slice()[self.local_index] %= val;
100            old
101        }
102    }
103}
104
105impl<T: Dist + std::cmp::Eq> GenericAtomicElement<T> {
106    pub fn compare_exchange(&self, current: T, new: T) -> Result<T, T> {
107        let _lock = self.array.lock_index(self.local_index);
108        let current_val = unsafe { self.array.__local_as_mut_slice()[self.local_index] };
109        if current_val == current {
110            unsafe {
111                self.array.__local_as_mut_slice()[self.local_index] = new;
112            }
113            Ok(current_val)
114        } else {
115            Err(current_val)
116        }
117    }
118}
119impl<T: Dist + std::cmp::PartialEq + std::cmp::PartialOrd + std::ops::Sub<Output = T>>
120    GenericAtomicElement<T>
121{
122    pub fn compare_exchange_epsilon(&self, current: T, new: T, eps: T) -> Result<T, T> {
123        let _lock = self.array.lock_index(self.local_index);
124        let current_val = unsafe { self.array.__local_as_mut_slice()[self.local_index] };
125        let same = if current_val > current {
126            current_val - current < eps
127        } else {
128            current - current_val < eps
129        };
130        if same {
131            unsafe {
132                self.array.__local_as_mut_slice()[self.local_index] = new;
133            }
134            Ok(current_val)
135        } else {
136            Err(current_val)
137        }
138    }
139}
140
141impl<T: ElementBitWiseOps + 'static> GenericAtomicElement<T> {
142    pub fn fetch_and(&self, val: T) -> T {
143        let _lock = self.array.lock_index(self.local_index);
144        unsafe {
145            let old = self.array.__local_as_mut_slice()[self.local_index];
146            self.array.__local_as_mut_slice()[self.local_index] &= val;
147            old
148        }
149    }
150    pub fn fetch_or(&self, val: T) -> T {
151        let _lock = self.array.lock_index(self.local_index);
152        unsafe {
153            let old = self.array.__local_as_mut_slice()[self.local_index];
154            self.array.__local_as_mut_slice()[self.local_index] |= val;
155            old
156        }
157    }
158    pub fn fetch_xor(&self, val: T) -> T {
159        let _lock = self.array.lock_index(self.local_index);
160        unsafe {
161            let old = self.array.__local_as_mut_slice()[self.local_index];
162            self.array.__local_as_mut_slice()[self.local_index] ^= val;
163            old
164        }
165    }
166}
167
168impl<T: ElementShiftOps + 'static> GenericAtomicElement<T> {
169    pub fn fetch_shl(&self, val: T) -> T {
170        let _lock = self.array.lock_index(self.local_index);
171        unsafe {
172            let old = self.array.__local_as_mut_slice()[self.local_index];
173            self.array.__local_as_mut_slice()[self.local_index] <<= val;
174            old
175        }
176    }
177    pub fn fetch_shr(&self, val: T) -> T {
178        let _lock = self.array.lock_index(self.local_index);
179        unsafe {
180            let old = self.array.__local_as_mut_slice()[self.local_index];
181            self.array.__local_as_mut_slice()[self.local_index] >>= val;
182            old
183        }
184    }
185}
186
187impl<T: Dist + ElementArithmeticOps> AddAssign<T> for GenericAtomicElement<T> {
188    fn add_assign(&mut self, val: T) {
189        // self.add(val)
190        let _lock = self.array.lock_index(self.local_index);
191        unsafe { self.array.__local_as_mut_slice()[self.local_index] += val }
192    }
193}
194
195impl<T: Dist + ElementArithmeticOps> SubAssign<T> for GenericAtomicElement<T> {
196    fn sub_assign(&mut self, val: T) {
197        let _lock = self.array.lock_index(self.local_index);
198        unsafe { self.array.__local_as_mut_slice()[self.local_index] -= val }
199    }
200}
201
202impl<T: Dist + ElementArithmeticOps> MulAssign<T> for GenericAtomicElement<T> {
203    fn mul_assign(&mut self, val: T) {
204        let _lock = self.array.lock_index(self.local_index);
205        unsafe { self.array.__local_as_mut_slice()[self.local_index] *= val }
206    }
207}
208
209impl<T: Dist + ElementArithmeticOps> DivAssign<T> for GenericAtomicElement<T> {
210    fn div_assign(&mut self, val: T) {
211        let _lock = self.array.lock_index(self.local_index);
212        unsafe { self.array.__local_as_mut_slice()[self.local_index] /= val }
213    }
214}
215
216impl<T: Dist + ElementArithmeticOps> RemAssign<T> for GenericAtomicElement<T> {
217    fn rem_assign(&mut self, val: T) {
218        let _lock = self.array.lock_index(self.local_index);
219        unsafe { self.array.__local_as_mut_slice()[self.local_index] %= val }
220    }
221}
222
223impl<T: Dist + ElementBitWiseOps> BitAndAssign<T> for GenericAtomicElement<T> {
224    fn bitand_assign(&mut self, val: T) {
225        let _lock = self.array.lock_index(self.local_index);
226        unsafe { self.array.__local_as_mut_slice()[self.local_index] &= val }
227    }
228}
229
230impl<T: Dist + ElementBitWiseOps> BitOrAssign<T> for GenericAtomicElement<T> {
231    fn bitor_assign(&mut self, val: T) {
232        let _lock = self.array.lock_index(self.local_index);
233        unsafe { self.array.__local_as_mut_slice()[self.local_index] |= val }
234    }
235}
236
237impl<T: Dist + ElementBitWiseOps> BitXorAssign<T> for GenericAtomicElement<T> {
238    fn bitxor_assign(&mut self, val: T) {
239        let _lock = self.array.lock_index(self.local_index);
240        unsafe { self.array.__local_as_mut_slice()[self.local_index] ^= val }
241    }
242}
243
244impl<T: Dist + ElementShiftOps> ShlAssign<T> for GenericAtomicElement<T> {
245    fn shl_assign(&mut self, val: T) {
246        let _lock = self.array.lock_index(self.local_index);
247        unsafe { self.array.__local_as_mut_slice()[self.local_index].shl_assign(val) }
248    }
249}
250
251impl<T: Dist + ElementShiftOps> ShrAssign<T> for GenericAtomicElement<T> {
252    fn shr_assign(&mut self, val: T) {
253        let _lock = self.array.lock_index(self.local_index);
254        unsafe { self.array.__local_as_mut_slice()[self.local_index].shr_assign(val) }
255    }
256}
257
258impl<T: Dist + std::fmt::Debug> std::fmt::Debug for GenericAtomicElement<T> {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        let _lock = self.array.lock_index(self.local_index);
261        let current_val = unsafe { self.array.__local_as_mut_slice()[self.local_index] };
262        write!(f, "{current_val:?}")
263    }
264}
265
266/// A variant of an [AtomicArray] providing atomic access for any type that implements [Dist][crate::memregion::Dist].
267///
268/// Atomicity is gauranteed by constructing a 1-Byte mutex for each element in the array.
269///
270/// Generally any operation on this array type will be performed via an internal runtime Active Message, i.e. direct RDMA operations are not allowed
271#[lamellar_impl::AmDataRT(Clone, Debug)]
272pub struct GenericAtomicArray<T> {
273    locks: Darc<Vec<Mutex<()>>>,
274    pub(crate) array: UnsafeArray<T>,
275}
276
277#[doc(hidden)]
278#[lamellar_impl::AmDataRT(Clone, Debug)]
279pub struct GenericAtomicByteArray {
280    locks: Darc<Vec<Mutex<()>>>,
281    pub(crate) array: UnsafeByteArray,
282}
283
284impl GenericAtomicByteArray {
285    //#[doc(hidden)]
286    pub fn lock_index(&self, index: usize) -> MutexGuard<()> {
287        let index = self
288            .array
289            .inner
290            .pe_full_offset_for_local_index(self.array.inner.data.my_pe, index)
291            .expect("invalid local index");
292        self.locks[index].lock()
293    }
294
295    //#[doc(hidden)]
296    pub fn downgrade(array: &GenericAtomicByteArray) -> GenericAtomicByteArrayWeak {
297        GenericAtomicByteArrayWeak {
298            locks: array.locks.clone(),
299            array: UnsafeByteArray::downgrade(&array.array),
300        }
301    }
302}
303
304#[doc(hidden)]
305#[lamellar_impl::AmLocalDataRT(Clone, Debug)]
306pub struct GenericAtomicByteArrayWeak {
307    locks: Darc<Vec<Mutex<()>>>,
308    pub(crate) array: UnsafeByteArrayWeak,
309}
310
311impl GenericAtomicByteArrayWeak {
312    //#[doc(hidden)]
313    pub fn upgrade(&self) -> Option<GenericAtomicByteArray> {
314        Some(GenericAtomicByteArray {
315            locks: self.locks.clone(),
316            array: self.array.upgrade()?,
317        })
318    }
319}
320
321#[doc(hidden)]
322#[derive(Clone, Debug)]
323pub struct GenericAtomicLocalData<T: Dist> {
324    pub(crate) array: GenericAtomicArray<T>,
325    start_index: usize,
326    end_index: usize,
327}
328
329#[doc(hidden)]
330#[derive(Debug)]
331pub struct GenericAtomicLocalDataIter<T: Dist> {
332    array: GenericAtomicArray<T>,
333    index: usize,
334    end_index: usize,
335}
336
337impl<T: Dist> GenericAtomicLocalData<T> {
338    pub fn at(&self, index: usize) -> GenericAtomicElement<T> {
339        GenericAtomicElement {
340            array: self.array.clone(),
341            local_index: index,
342        }
343    }
344
345    pub fn get_mut(&self, index: usize) -> Option<GenericAtomicElement<T>> {
346        Some(GenericAtomicElement {
347            array: self.array.clone(),
348            local_index: index,
349        })
350    }
351
352    pub fn len(&self) -> usize {
353        unsafe { self.array.__local_as_mut_slice().len() }
354    }
355
356    pub fn iter(&self) -> GenericAtomicLocalDataIter<T> {
357        GenericAtomicLocalDataIter {
358            array: self.array.clone(),
359            index: self.start_index,
360            end_index: self.end_index,
361        }
362    }
363
364    pub fn sub_data(&self, start_index: usize, end_index: usize) -> GenericAtomicLocalData<T> {
365        GenericAtomicLocalData {
366            array: self.array.clone(),
367            start_index: start_index,
368            end_index: std::cmp::min(end_index, self.array.num_elems_local()),
369        }
370    }
371}
372
373impl<T: Dist + serde::Serialize> serde::Serialize for GenericAtomicLocalData<T> {
374    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
375    where
376        S: serde::Serializer,
377    {
378        let mut s = serializer.serialize_seq(Some(self.len()))?;
379        for i in 0..self.len() {
380            s.serialize_element(&self.at(i).load())?;
381        }
382        s.end()
383    }
384}
385
386impl<T: Dist> IntoIterator for GenericAtomicLocalData<T> {
387    type Item = GenericAtomicElement<T>;
388    type IntoIter = GenericAtomicLocalDataIter<T>;
389    fn into_iter(self) -> Self::IntoIter {
390        GenericAtomicLocalDataIter {
391            array: self.array,
392            index: self.start_index,
393            end_index: self.end_index,
394        }
395    }
396}
397
398impl<T: Dist> Iterator for GenericAtomicLocalDataIter<T> {
399    type Item = GenericAtomicElement<T>;
400    fn next(&mut self) -> Option<Self::Item> {
401        if self.index < self.end_index {
402            let index = self.index;
403            self.index += 1;
404            Some(GenericAtomicElement {
405                array: self.array.clone(),
406                local_index: index,
407            })
408        } else {
409            None
410        }
411    }
412}
413
414impl<T: Dist + ArrayOps + std::default::Default> GenericAtomicArray<T> {
415    pub(crate) fn new<U: Clone + Into<IntoLamellarTeam>>(
416        team: U,
417        array_size: usize,
418        distribution: Distribution,
419    ) -> GenericAtomicArrayHandle<T> {
420        // println!("new generic_atomic array");
421
422        let team = team.into().team.clone();
423        GenericAtomicArrayHandle {
424            team: team.clone(),
425            launched: false,
426            creation_future: Box::pin(async move {
427                let array = UnsafeArray::async_new(
428                    team.clone(),
429                    array_size,
430                    distribution,
431                    DarcMode::LocalLockArray,
432                )
433                .await;
434                let mut vec = vec![];
435                for _i in 0..array.num_elems_local() {
436                    vec.push(Mutex::new(()));
437                }
438                GenericAtomicArray {
439                    locks: Darc::new(team, vec).await.expect("pe exists in team"),
440                    array,
441                }
442            }),
443        }
444    }
445}
446
447impl<T: Dist> GenericAtomicArray<T> {
448    pub(crate) fn get_element(&self, index: usize) -> Option<GenericAtomicElement<T>> {
449        if index < unsafe { self.__local_as_slice().len() } {
450            //We are only directly accessing the local slice for its len
451            Some(GenericAtomicElement {
452                array: self.clone(),
453                local_index: index,
454            })
455        } else {
456            None
457        }
458    }
459}
460#[doc(hidden)]
461impl<T: Dist> GenericAtomicArray<T> {
462    //#[doc(hidden)]
463    pub fn use_distribution(self, distribution: Distribution) -> Self {
464        GenericAtomicArray {
465            locks: self.locks.clone(),
466            array: self.array.use_distribution(distribution),
467        }
468    }
469
470    //#[doc(hidden)]
471    pub fn local_data(&self) -> GenericAtomicLocalData<T> {
472        GenericAtomicLocalData {
473            array: self.clone(),
474            start_index: 0,
475            end_index: self.array.num_elems_local(),
476        }
477    }
478
479    //#[doc(hidden)]
480    pub fn mut_local_data(&self) -> GenericAtomicLocalData<T> {
481        GenericAtomicLocalData {
482            array: self.clone(),
483            start_index: 0,
484            end_index: self.array.num_elems_local(),
485        }
486    }
487
488    //#[doc(hidden)]
489    pub unsafe fn __local_as_slice(&self) -> &[T] {
490        self.array.local_as_mut_slice()
491    }
492    //#[doc(hidden)]
493    pub unsafe fn __local_as_mut_slice(&self) -> &mut [T] {
494        self.array.local_as_mut_slice()
495    }
496
497    //#[doc(hidden)]
498    pub fn into_unsafe(self) -> IntoUnsafeArrayHandle<T> {
499        // println!("generic into_unsafe");
500        // self.array.into()
501        IntoUnsafeArrayHandle {
502            team: self.array.inner.data.team.clone(),
503            launched: false,
504            outstanding_future: Box::pin(self.async_into()),
505        }
506    }
507
508    //#[doc(hidden)]
509    pub fn into_read_only(self) -> IntoReadOnlyArrayHandle<T> {
510        // println!("generic into_read_only");
511        self.array.into_read_only()
512    }
513
514    //#[doc(hidden)]
515    pub fn into_local_lock(self) -> IntoLocalLockArrayHandle<T> {
516        // println!("generic into_local_lock");
517        self.array.into_local_lock()
518    }
519
520    //#[doc(hidden)]
521    pub fn into_global_lock(self) -> IntoGlobalLockArrayHandle<T> {
522        // println!("generic into_local_lock");
523        self.array.into_global_lock()
524    }
525
526    //#[doc(hidden)]
527    pub fn lock_index(&self, index: usize) -> MutexGuard<()> {
528        // if let Some(ref locks) = *self.locks {
529        //     let start_index = (index * std::mem::size_of::<T>()) / self.orig_t_size;
530        //     let end_index = ((index + 1) * std::mem::size_of::<T>()) / self.orig_t_size;
531        //     let mut guards = vec![];
532        //     for i in start_index..end_index {
533        //         guards.push(locks[i].lock())
534        //     }
535        //     Some(guards)
536        // } else {
537        //     None
538        // }
539        // println!("trying to lock {:?}",index);
540        let index = self
541            .array
542            .inner
543            .pe_full_offset_for_local_index(self.array.inner.data.my_pe, index)
544            .expect("invalid local index");
545        self.locks[index].lock()
546    }
547}
548
549impl<T: Dist + 'static> GenericAtomicArray<T> {
550    #[doc(hidden)]
551    pub fn into_atomic(self) -> IntoAtomicArrayHandle<T> {
552        // println!("generic into_atomic");
553        self.array.into_atomic()
554    }
555}
556
557// #[async_trait]
558impl<T: Dist + ArrayOps> AsyncTeamFrom<(Vec<T>, Distribution)> for GenericAtomicArray<T> {
559    async fn team_from(input: (Vec<T>, Distribution), team: &Arc<LamellarTeam>) -> Self {
560        let array: UnsafeArray<T> = AsyncTeamInto::team_into(input, team).await;
561        array.async_into().await
562    }
563}
564
565#[async_trait]
566impl<T: Dist> AsyncFrom<UnsafeArray<T>> for GenericAtomicArray<T> {
567    async fn async_from(array: UnsafeArray<T>) -> Self {
568        // println!("generic from unsafe array");
569        array
570            .await_on_outstanding(DarcMode::GenericAtomicArray)
571            .await;
572        let mut vec = vec![];
573        for _i in 0..array.num_elems_local() {
574            vec.push(Mutex::new(()));
575        }
576        let locks = Darc::new(array.team_rt(), vec).await.expect("PE in team");
577
578        GenericAtomicArray {
579            locks: locks,
580            array: array,
581        }
582    }
583}
584
585impl<T: Dist> From<GenericAtomicArray<T>> for GenericAtomicByteArray {
586    fn from(array: GenericAtomicArray<T>) -> Self {
587        GenericAtomicByteArray {
588            locks: array.locks.clone(),
589            array: array.array.into(),
590        }
591    }
592}
593
594impl<T: Dist> From<GenericAtomicArray<T>> for LamellarByteArray {
595    fn from(array: GenericAtomicArray<T>) -> Self {
596        LamellarByteArray::GenericAtomicArray(GenericAtomicByteArray {
597            locks: array.locks.clone(),
598            array: array.array.into(),
599        })
600    }
601}
602
603impl<T: Dist> From<LamellarByteArray> for GenericAtomicArray<T> {
604    fn from(array: LamellarByteArray) -> Self {
605        if let LamellarByteArray::GenericAtomicArray(array) = array {
606            array.into()
607        } else {
608            panic!("Expected LamellarByteArray::GenericAtomicArray")
609        }
610    }
611}
612
613impl<T: Dist> From<GenericAtomicArray<T>> for AtomicByteArray {
614    fn from(array: GenericAtomicArray<T>) -> Self {
615        AtomicByteArray::GenericAtomicByteArray(GenericAtomicByteArray {
616            locks: array.locks.clone(),
617            array: array.array.into(),
618        })
619    }
620}
621impl<T: Dist> From<GenericAtomicByteArray> for GenericAtomicArray<T> {
622    fn from(array: GenericAtomicByteArray) -> Self {
623        GenericAtomicArray {
624            locks: array.locks.clone(),
625            array: array.array.into(),
626        }
627    }
628}
629impl<T: Dist> From<GenericAtomicByteArray> for AtomicArray<T> {
630    fn from(array: GenericAtomicByteArray) -> Self {
631        GenericAtomicArray {
632            locks: array.locks.clone(),
633            array: array.array.into(),
634        }
635        .into()
636    }
637}
638
639impl<T: Dist> private::ArrayExecAm<T> for GenericAtomicArray<T> {
640    fn team_rt(&self) -> Pin<Arc<LamellarTeamRT>> {
641        self.array.team_rt()
642    }
643    fn team_counters(&self) -> Arc<AMCounters> {
644        self.array.team_counters()
645    }
646}
647
648impl<T: Dist> private::LamellarArrayPrivate<T> for GenericAtomicArray<T> {
649    fn inner_array(&self) -> &UnsafeArray<T> {
650        &self.array
651    }
652    fn local_as_ptr(&self) -> *const T {
653        self.array.local_as_mut_ptr()
654    }
655    fn local_as_mut_ptr(&self) -> *mut T {
656        self.array.local_as_mut_ptr()
657    }
658    fn pe_for_dist_index(&self, index: usize) -> Option<usize> {
659        self.array.pe_for_dist_index(index)
660    }
661    fn pe_offset_for_dist_index(&self, pe: usize, index: usize) -> Option<usize> {
662        self.array.pe_offset_for_dist_index(pe, index)
663    }
664    unsafe fn into_inner(self) -> UnsafeArray<T> {
665        self.array
666    }
667    fn as_lamellar_byte_array(&self) -> LamellarByteArray {
668        self.clone().into()
669    }
670}
671
672impl<T: Dist> ActiveMessaging for GenericAtomicArray<T> {
673    type SinglePeAmHandle<R: AmDist> = AmHandle<R>;
674    type MultiAmHandle<R: AmDist> = MultiAmHandle<R>;
675    type LocalAmHandle<L> = LocalAmHandle<L>;
676    fn exec_am_all<F>(&self, am: F) -> Self::MultiAmHandle<F::Output>
677    where
678        F: RemoteActiveMessage + LamellarAM + Serde + AmDist,
679    {
680        self.array.exec_am_all_tg(am)
681    }
682    fn exec_am_pe<F>(&self, pe: usize, am: F) -> Self::SinglePeAmHandle<F::Output>
683    where
684        F: RemoteActiveMessage + LamellarAM + Serde + AmDist,
685    {
686        self.array.exec_am_pe_tg(pe, am)
687    }
688    fn exec_am_local<F>(&self, am: F) -> Self::LocalAmHandle<F::Output>
689    where
690        F: LamellarActiveMessage + LocalAM + 'static,
691    {
692        self.array.exec_am_local_tg(am)
693    }
694    fn wait_all(&self) {
695        self.array.wait_all()
696    }
697    fn await_all(&self) -> impl Future<Output = ()> + Send {
698        self.array.await_all()
699    }
700    fn barrier(&self) {
701        self.array.barrier()
702    }
703    fn async_barrier(&self) -> BarrierHandle {
704        self.array.async_barrier()
705    }
706    fn spawn<F: Future>(&self, f: F) -> LamellarTask<F::Output>
707    where
708        F: Future + Send + 'static,
709        F::Output: Send,
710    {
711        self.array.spawn(f)
712    }
713    fn block_on<F: Future>(&self, f: F) -> F::Output {
714        self.array.block_on(f)
715    }
716    fn block_on_all<I>(&self, iter: I) -> Vec<<<I as IntoIterator>::Item as Future>::Output>
717    where
718        I: IntoIterator,
719        <I as IntoIterator>::Item: Future + Send + 'static,
720        <<I as IntoIterator>::Item as Future>::Output: Send,
721    {
722        self.array.block_on_all(iter)
723    }
724}
725
726impl<T: Dist> LamellarArray<T> for GenericAtomicArray<T> {
727    // fn team_rt(&self) -> Pin<Arc<LamellarTeamRT>> {
728    //     self.array.team_rt()
729    // }
730    // fn my_pe(&self) -> usize {
731    //     LamellarArray::my_pe(&self.array)
732    // }
733    // fn num_pes(&self) -> usize {
734    //     LamellarArray::num_pes(&self.array)
735    // }
736    fn len(&self) -> usize {
737        self.array.len()
738    }
739    fn num_elems_local(&self) -> usize {
740        self.array.num_elems_local()
741    }
742    // fn barrier(&self) {
743    //     self.array.barrier();
744    // }
745
746    // fn wait_all(&self) {
747    //     self.array.wait_all()
748    //     // println!("done in wait all {:?}",std::time::SystemTime::now());
749    // }
750    // fn block_on<F: Future>(&self, f: F) -> F::Output {
751    //     self.array.block_on(f)
752    // }
753    fn pe_and_offset_for_global_index(&self, index: usize) -> Option<(usize, usize)> {
754        self.array.pe_and_offset_for_global_index(index)
755    }
756    fn first_global_index_for_pe(&self, pe: usize) -> Option<usize> {
757        self.array.first_global_index_for_pe(pe)
758    }
759
760    fn last_global_index_for_pe(&self, pe: usize) -> Option<usize> {
761        self.array.last_global_index_for_pe(pe)
762    }
763}
764
765impl<T: Dist> LamellarEnv for GenericAtomicArray<T> {
766    fn my_pe(&self) -> usize {
767        LamellarEnv::my_pe(&self.array)
768    }
769
770    fn num_pes(&self) -> usize {
771        LamellarEnv::num_pes(&self.array)
772    }
773    fn num_threads_per_pe(&self) -> usize {
774        self.array.team_rt().num_threads()
775    }
776    fn world(&self) -> Arc<LamellarTeam> {
777        self.array.team_rt().world()
778    }
779    fn team(&self) -> Arc<LamellarTeam> {
780        self.array.team_rt().team()
781    }
782}
783
784impl<T: Dist> LamellarWrite for GenericAtomicArray<T> {}
785impl<T: Dist> LamellarRead for GenericAtomicArray<T> {}
786
787impl<T: Dist> SubArray<T> for GenericAtomicArray<T> {
788    type Array = GenericAtomicArray<T>;
789    fn sub_array<R: std::ops::RangeBounds<usize>>(&self, range: R) -> Self::Array {
790        GenericAtomicArray {
791            locks: self.locks.clone(),
792            array: self.array.sub_array(range),
793        }
794    }
795    fn global_index(&self, sub_index: usize) -> usize {
796        self.array.global_index(sub_index)
797    }
798}
799
800impl<T: Dist + std::fmt::Debug> GenericAtomicArray<T> {
801    #[doc(alias = "Collective")]
802    /// Print the data within a lamellar array
803    ///
804    /// # Collective Operation
805    /// Requires all PEs associated with the array to enter the print call otherwise deadlock will occur (i.e. barriers are being called internally)
806    ///
807    /// # Examples
808    ///```
809    /// use lamellar::array::prelude::*;
810    /// let world = LamellarWorldBuilder::new().build();
811    /// let block_array = AtomicArray::<f32>::new(&world,100,Distribution::Block).block();
812    /// let cyclic_array = AtomicArray::<f32>::new(&world,100,Distribution::Block).block();
813    ///
814    /// block_array.print();
815    /// println!();
816    /// cyclic_array.print();
817    ///```
818    pub fn print(&self) {
819        self.array.print();
820    }
821}
822
823impl<T: Dist + std::fmt::Debug> ArrayPrint<T> for GenericAtomicArray<T> {
824    fn print(&self) {
825        self.array.print()
826    }
827}
828
829impl<T: Dist + AmDist + 'static> GenericAtomicArray<T> {
830    #[doc(hidden)]
831    pub fn reduce(&self, op: &str) -> AmHandle<Option<T>> {
832        self.array.reduce_data(op, self.clone().into())
833    }
834}
835impl<T: Dist + AmDist + ElementArithmeticOps + 'static> GenericAtomicArray<T> {
836    #[doc(hidden)]
837    pub fn sum(&self) -> AmHandle<Option<T>> {
838        self.reduce("sum")
839    }
840    #[doc(hidden)]
841    pub fn prod(&self) -> AmHandle<Option<T>> {
842        self.reduce("prod")
843    }
844}
845impl<T: Dist + AmDist + ElementComparePartialEqOps + 'static> GenericAtomicArray<T> {
846    #[doc(hidden)]
847    pub fn max(&self) -> AmHandle<Option<T>> {
848        self.reduce("max")
849    }
850    #[doc(hidden)]
851    pub fn min(&self) -> AmHandle<Option<T>> {
852        self.reduce("min")
853    }
854}
855
856#[doc(hidden)]
857pub struct LocalGenericAtomicElement<T> {
858    pub(crate) val: Mutex<T>,
859}
860
861impl<T: Dist> From<LocalGenericAtomicElement<T>> for AtomicElement<T> {
862    fn from(element: LocalGenericAtomicElement<T>) -> AtomicElement<T> {
863        AtomicElement::LocalGenericAtomicElement(element)
864    }
865}
866
867impl<T: Dist> LocalGenericAtomicElement<T> {
868    pub fn load(&self) -> T {
869        *self.val.lock()
870    }
871    pub fn store(&self, val: T) {
872        *self.val.lock() = val
873    }
874    pub fn swap(&self, val: T) -> T {
875        let old = *self.val.lock();
876        *self.val.lock() = val;
877        old
878    }
879}
880impl<T: ElementArithmeticOps> LocalGenericAtomicElement<T> {
881    pub fn fetch_add(&self, val: T) -> T {
882        let old = *self.val.lock();
883        *self.val.lock() += val;
884        old
885    }
886    pub fn fetch_sub(&self, val: T) -> T {
887        let old = *self.val.lock();
888        *self.val.lock() -= val;
889        old
890    }
891    pub fn fetch_mul(&self, val: T) -> T {
892        let old = *self.val.lock();
893        *self.val.lock() *= val;
894        old
895    }
896    pub fn fetch_div(&self, val: T) -> T {
897        let old = *self.val.lock();
898        *self.val.lock() /= val;
899        old
900    }
901    pub fn fetch_rem(&self, val: T) -> T {
902        let old = *self.val.lock();
903        *self.val.lock() %= val;
904        old
905    }
906}
907
908impl<T: Dist + std::cmp::Eq> LocalGenericAtomicElement<T> {
909    pub fn compare_exchange(&self, current: T, new: T) -> Result<T, T> {
910        let current_val = *self.val.lock();
911        if current_val == current {
912            *self.val.lock() = new;
913
914            Ok(current_val)
915        } else {
916            Err(current_val)
917        }
918    }
919}
920impl<T: Dist + std::cmp::PartialEq + std::cmp::PartialOrd + std::ops::Sub<Output = T>>
921    LocalGenericAtomicElement<T>
922{
923    pub fn compare_exchange_epsilon(&self, current: T, new: T, eps: T) -> Result<T, T> {
924        let current_val = *self.val.lock();
925        let same = if current_val > current {
926            current_val - current < eps
927        } else {
928            current - current_val < eps
929        };
930        if same {
931            *self.val.lock() = new;
932
933            Ok(current_val)
934        } else {
935            Err(current_val)
936        }
937    }
938}
939
940impl<T: ElementBitWiseOps + 'static> LocalGenericAtomicElement<T> {
941    pub fn fetch_and(&self, val: T) -> T {
942        let old = *self.val.lock();
943        *self.val.lock() &= val;
944        old
945    }
946    pub fn fetch_or(&self, val: T) -> T {
947        let old = *self.val.lock();
948        *self.val.lock() |= val;
949        old
950    }
951    pub fn fetch_xor(&self, val: T) -> T {
952        let old = *self.val.lock();
953        *self.val.lock() ^= val;
954        old
955    }
956}
957
958impl<T: ElementShiftOps + 'static> LocalGenericAtomicElement<T> {
959    pub fn fetch_shl(&self, val: T) -> T {
960        let old = *self.val.lock();
961        *self.val.lock() <<= val;
962        old
963    }
964    pub fn fetch_shr(&self, val: T) -> T {
965        let old = *self.val.lock();
966        *self.val.lock() >>= val;
967        old
968    }
969}
970
971impl<T: Dist + ElementArithmeticOps> AddAssign<T> for LocalGenericAtomicElement<T> {
972    fn add_assign(&mut self, val: T) {
973        // self.add(val)
974        *self.val.lock() += val
975    }
976}
977
978impl<T: Dist + ElementArithmeticOps> SubAssign<T> for LocalGenericAtomicElement<T> {
979    fn sub_assign(&mut self, val: T) {
980        *self.val.lock() -= val
981    }
982}
983
984impl<T: Dist + ElementArithmeticOps> MulAssign<T> for LocalGenericAtomicElement<T> {
985    fn mul_assign(&mut self, val: T) {
986        *self.val.lock() *= val
987    }
988}
989
990impl<T: Dist + ElementArithmeticOps> DivAssign<T> for LocalGenericAtomicElement<T> {
991    fn div_assign(&mut self, val: T) {
992        *self.val.lock() /= val
993    }
994}
995
996impl<T: Dist + ElementArithmeticOps> RemAssign<T> for LocalGenericAtomicElement<T> {
997    fn rem_assign(&mut self, val: T) {
998        *self.val.lock() %= val
999    }
1000}
1001
1002impl<T: Dist + ElementBitWiseOps> BitAndAssign<T> for LocalGenericAtomicElement<T> {
1003    fn bitand_assign(&mut self, val: T) {
1004        *self.val.lock() &= val
1005    }
1006}
1007
1008impl<T: Dist + ElementBitWiseOps> BitOrAssign<T> for LocalGenericAtomicElement<T> {
1009    fn bitor_assign(&mut self, val: T) {
1010        *self.val.lock() |= val
1011    }
1012}
1013
1014impl<T: Dist + ElementBitWiseOps> BitXorAssign<T> for LocalGenericAtomicElement<T> {
1015    fn bitxor_assign(&mut self, val: T) {
1016        *self.val.lock() ^= val
1017    }
1018}
1019
1020impl<T: Dist + ElementShiftOps> ShlAssign<T> for LocalGenericAtomicElement<T> {
1021    fn shl_assign(&mut self, val: T) {
1022        self.val.lock().shl_assign(val)
1023    }
1024}
1025
1026impl<T: Dist + ElementShiftOps> ShrAssign<T> for LocalGenericAtomicElement<T> {
1027    fn shr_assign(&mut self, val: T) {
1028        self.val.lock().shr_assign(val)
1029    }
1030}
1031
1032impl<T: Dist + std::fmt::Debug> std::fmt::Debug for LocalGenericAtomicElement<T> {
1033    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1034        let current_val = *self.val.lock();
1035        write!(f, "{current_val:?}")
1036    }
1037}