lamellar/array/
unsafe.rs

1//! This module provides an unsafe abstraction of a distributed array in Lamellar.
2
3pub(crate) mod handle;
4pub use handle::*;
5mod iteration;
6pub(crate) mod local_chunks;
7pub use local_chunks::*;
8pub(crate) mod operations;
9pub use operations::*;
10mod rdma;
11
12
13use crate::active_messaging::ActiveMessaging;
14use crate::active_messaging::*;
15// use crate::array::r#unsafe::operations::BUFOPS;
16use crate::array::private::{ArrayExecAm, LamellarArrayPrivate};
17use crate::array::*;
18use crate::array::{LamellarRead, LamellarWrite};
19use crate::barrier::BarrierHandle;
20use crate::darc::{Darc, DarcMode, WeakDarc};
21use crate::env_var::config;
22use crate::lamellae::AllocationType;
23use crate::lamellar_team::{IntoLamellarTeam, LamellarTeamRT};
24use crate::memregion::{Dist, MemoryRegion};
25use crate::scheduler::LamellarTask;
26use crate::warnings::RuntimeWarning;
27use crate::LamellarTaskGroup;
28
29use core::marker::PhantomData;
30use futures_util::{future, StreamExt};
31use std::ops::Bound;
32use std::pin::Pin;
33use std::sync::atomic::{AtomicUsize, Ordering};
34use std::sync::Arc;
35use std::time::Instant;
36
37pub(crate) struct UnsafeArrayData {
38    mem_region: MemoryRegion<u8>,
39    pub(crate) array_counters: Arc<AMCounters>,
40    pub(crate) team: Pin<Arc<LamellarTeamRT>>,
41    pub(crate) task_group: Arc<LamellarTaskGroup>,
42    pub(crate) my_pe: usize,
43    pub(crate) num_pes: usize,
44    req_cnt: Arc<AtomicUsize>,
45}
46
47impl std::fmt::Debug for UnsafeArrayData {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        write!(
50            f,
51            "UnsafeArrayData{{ mem_region: {:?}, array_counters: {:?}, team: {:?}, task_group: {:?}, my_pe: {:?}, num_pes: {:?} req_cnt: {:?} }}",
52            self.mem_region, self.array_counters, self.team, self.task_group, self.my_pe, self.num_pes, self.req_cnt
53        )
54    }
55}
56
57/// An unsafe abstraction of a distributed array.
58///
59/// This array type provides no gaurantees on how it's data is being accessed, either locally or from remote PEs.
60///
61/// UnsafeArrays are really intended for use in the runtime as the foundation for our safe array types.
62///
63/// # Warning
64/// Unless you are very confident in low level distributed memory access it is highly recommended you utilize the
65/// the other LamellarArray types ([AtomicArray], [LocalLockArray], [GlobalLockArray], [ReadOnlyArray]) to construct and interact with distributed memory.
66#[lamellar_impl::AmDataRT(Clone, Debug)]
67pub struct UnsafeArray<T> {
68    pub(crate) inner: UnsafeArrayInner,
69    phantom: PhantomData<T>,
70}
71
72#[doc(hidden)]
73#[lamellar_impl::AmDataRT(Clone, Debug)]
74pub struct UnsafeByteArray {
75    pub(crate) inner: UnsafeArrayInner,
76}
77
78impl UnsafeByteArray {
79    pub(crate) fn downgrade(array: &UnsafeByteArray) -> UnsafeByteArrayWeak {
80        UnsafeByteArrayWeak {
81            inner: UnsafeArrayInner::downgrade(&array.inner),
82        }
83    }
84}
85
86#[doc(hidden)]
87#[lamellar_impl::AmLocalDataRT(Clone, Debug)]
88pub struct UnsafeByteArrayWeak {
89    pub(crate) inner: UnsafeArrayInnerWeak,
90}
91
92impl UnsafeByteArrayWeak {
93    pub fn upgrade(&self) -> Option<UnsafeByteArray> {
94        if let Some(inner) = self.inner.upgrade() {
95            Some(UnsafeByteArray { inner })
96        } else {
97            None
98        }
99    }
100}
101
102pub(crate) mod private {
103    use super::UnsafeArrayData;
104    use crate::array::Distribution;
105    use crate::darc::Darc;
106    #[lamellar_impl::AmDataRT(Clone, Debug)]
107    pub struct UnsafeArrayInner {
108        pub(crate) data: Darc<UnsafeArrayData>,
109        pub(crate) distribution: Distribution,
110        pub(crate) orig_elem_per_pe: usize,
111        pub(crate) orig_remaining_elems: usize,
112        pub(crate) elem_size: usize, //for bytes array will be size of T, for T array will be 1
113        pub(crate) offset: usize,    //relative to size of T
114        pub(crate) size: usize,      //relative to size of T
115        pub(crate) sub: bool,
116    }
117}
118use private::UnsafeArrayInner;
119
120#[lamellar_impl::AmLocalDataRT(Clone, Debug)]
121pub(crate) struct UnsafeArrayInnerWeak {
122    pub(crate) data: WeakDarc<UnsafeArrayData>,
123    pub(crate) distribution: Distribution,
124    orig_elem_per_pe: usize,
125    orig_remaining_elems: usize, // the number of elements that can't be evenly divided amongst all PES
126    elem_size: usize,            //for bytes array will be size of T, for T array will be 1
127    offset: usize,               //relative to size of T
128    size: usize,                 //relative to size of T
129    sub: bool,
130}
131
132// impl Drop for UnsafeArrayInner {
133//     fn drop(&mut self) {
134//         // println!("unsafe array inner dropping");
135//     }
136// }
137
138// impl<T: Dist> Drop for UnsafeArray<T> {
139//     fn drop(&mut self) {
140//         println!("Dropping unsafe array");
141//         // self.wait_all();
142//     }
143// }
144
145impl<T: Dist + ArrayOps + 'static> UnsafeArray<T> {
146    #[doc(alias = "Collective")]
147    /// Construct a new UnsafeArray with a length of `array_size` whose data will be layed out with the provided `distribution` on the PE's specified by the `team`.
148    /// `team` is commonly a [LamellarWorld][crate::LamellarWorld] or [LamellarTeam] (instance or reference).
149    ///
150    /// # Collective Operation
151    /// Requires all PEs associated with the `team` to enter the constructor call otherwise deadlock will occur (i.e. team barriers are being called internally)
152    ///
153    /// # Examples
154    ///```
155    /// use lamellar::array::prelude::*;
156    ///
157    /// let world = LamellarWorldBuilder::new().build();
158    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
159    pub fn new<U: Into<IntoLamellarTeam>>(
160        team: U,
161        array_size: usize,
162        distribution: Distribution,
163    ) -> UnsafeArrayHandle<T> {
164        // let temp_team = team.into();
165        let team = team.into().team.clone();
166        UnsafeArrayHandle {
167            team: team.clone(),
168            launched: false,
169            creation_future: Box::pin(UnsafeArray::async_new(
170                team,
171                array_size,
172                distribution,
173                DarcMode::UnsafeArray,
174            )),
175        }
176    }
177
178    pub(crate) async fn async_new<U: Into<IntoLamellarTeam>>(
179        team: U,
180        array_size: usize,
181        distribution: Distribution,
182        darc_mode: DarcMode,
183    ) -> UnsafeArray<T> {
184        let team = team.into().team.clone();
185        team.async_barrier().await;
186        let task_group = LamellarTaskGroup::new(team.clone());
187        let my_pe = team.team_pe_id().unwrap();
188        let num_pes = team.num_pes();
189        let full_array_size = std::cmp::max(array_size, num_pes);
190
191        let elem_per_pe = full_array_size / num_pes;
192        let remaining_elems = full_array_size % num_pes;
193        let mut per_pe_size = elem_per_pe;
194        if remaining_elems > 0 {
195            per_pe_size += 1
196        }
197        let rmr_t: MemoryRegion<T> = if team.num_world_pes == team.num_pes {
198            MemoryRegion::new(per_pe_size, team.lamellae.clone(), AllocationType::Global)
199        } else {
200            MemoryRegion::new(
201                per_pe_size,
202                team.lamellae.clone(),
203                AllocationType::Sub(team.get_pes()),
204            )
205        };
206
207        unsafe {
208            // for elem in rmr_t.as_mut_slice().expect("data should exist on pe") {
209            //     *elem = std::mem::zeroed();
210            // }
211            if std::mem::needs_drop::<T>() {
212                // If `T` needs to be dropped then we have to do this one item at a time, in
213                // case one of the intermediate drops does a panic.
214                // slice.iter_mut().for_each(write_zeroes);
215                panic!("Lamellar Arrays do not yet support elements that impl Drop");
216            } else {
217                // Otherwise we can be really fast and just fill everthing with zeros.
218                let len = std::mem::size_of_val::<[T]>(
219                    rmr_t.as_mut_slice().expect("data should exist on pe"),
220                );
221                std::ptr::write_bytes(
222                    rmr_t.as_mut_ptr().expect("data should exist on pe") as *mut u8,
223                    0u8,
224                    len,
225                )
226            }
227        }
228        let rmr = unsafe { rmr_t.to_base::<u8>() };
229
230        let data = Darc::async_try_new_with_drop(
231            team.clone(),
232            UnsafeArrayData {
233                mem_region: rmr,
234                array_counters: Arc::new(AMCounters::new()),
235                team: team.clone(),
236                task_group: Arc::new(task_group),
237                my_pe: my_pe,
238                num_pes: num_pes,
239                req_cnt: Arc::new(AtomicUsize::new(0)),
240            },
241            darc_mode,
242            None,
243        )
244        .await
245        .expect("trying to create array on non team member");
246        let array = UnsafeArray {
247            inner: UnsafeArrayInner {
248                data: data,
249                distribution: distribution.clone(),
250                orig_elem_per_pe: elem_per_pe,
251                orig_remaining_elems: remaining_elems,
252                elem_size: std::mem::size_of::<T>(),
253                offset: 0,             //relative to size of T
254                size: full_array_size, //relative to size of T
255                sub: false,
256            },
257            phantom: PhantomData,
258        };
259        // println!("new unsafe");
260        // unsafe {println!("size {:?} bytes {:?}",array.inner.size, array.inner.data.mem_region.as_mut_slice().unwrap().len())};
261        // println!("elem per pe {:?}", elem_per_pe);
262        // for i in 0..num_pes{
263        //     println!("pe: {:?} {:?}",i,array.inner.num_elems_pe(i));
264        // }
265        // array.inner.data.print();
266        if full_array_size != array_size {
267            println!("WARNING: Array size {array_size} is less than number of pes {full_array_size}, each PE will not contain data");
268            array.sub_array(0..array_size)
269        } else {
270            array
271        }
272        // println!("after buffered ops");
273        // array.inner.data.print();
274    }
275}
276impl<T: Dist + 'static> UnsafeArray<T> {
277    #[doc(alias("One-sided", "onesided"))]
278    /// Change the distribution this array handle uses to index into the data of the array.
279    ///
280    /// # One-sided Operation
281    /// This is a one-sided call and does not redistribute the modify actual data, it simply changes how the array is indexed for this particular handle.
282    ///
283    /// # Examples
284    ///```
285    /// use lamellar::array::prelude::*;
286    /// let world = LamellarWorldBuilder::new().build();
287    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
288    /// // do something interesting... or not
289    /// let block_view = array.clone().use_distribution(Distribution::Block);
290    ///```
291    pub fn use_distribution(mut self, distribution: Distribution) -> Self {
292        self.inner.distribution = distribution;
293        self
294    }
295
296    #[doc(alias("One-sided", "onesided"))]
297    /// Return the calling PE's local data as an immutable slice
298    ///
299    /// # Safety
300    /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's may access this PE's local data.
301    /// It is also possible to have mutable and immutable references to this arrays data on the same PE
302    ///
303    /// # One-sided Operation
304    /// Only returns local data on the calling PE
305    ///
306    /// # Examples
307    ///```
308    /// use lamellar::array::prelude::*;
309    /// let world = LamellarWorldBuilder::new().build();
310    /// let my_pe = world.my_pe();
311    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
312    ///
313    /// unsafe {
314    ///     let slice = array.local_as_slice();
315    ///     println!("PE{my_pe} data: {slice:?}");
316    /// }
317    ///```
318    pub unsafe fn local_as_slice(&self) -> &[T] {
319        self.local_as_mut_slice()
320    }
321
322    #[doc(alias("One-sided", "onesided"))]
323    /// Return the calling PE's local data as a mutable slice
324    ///
325    /// # Safety
326    /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's may access this PE's local data.
327    /// It is also possible to have mutable and immutable references to this arrays data on the same PE
328    ///
329    /// # One-sided Operation
330    /// Only returns local data on the calling PE
331    ///
332    /// # Examples
333    ///```
334    /// use lamellar::array::prelude::*;
335    /// let world = LamellarWorldBuilder::new().build();
336    /// let my_pe = world.my_pe();
337    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
338    ///
339    /// unsafe {
340    ///     let slice =  array.local_as_mut_slice();
341    ///     for elem in slice{
342    ///         *elem += 1;
343    ///     }
344    /// }
345    ///```
346    pub unsafe fn local_as_mut_slice(&self) -> &mut [T] {
347        let u8_slice = self.inner.local_as_mut_slice();
348        // println!("u8 slice {:?} u8_len {:?} len {:?}",u8_slice,u8_slice.len(),u8_slice.len()/std::mem::size_of::<T>());
349        std::slice::from_raw_parts_mut(
350            u8_slice.as_mut_ptr() as *mut T,
351            u8_slice.len() / std::mem::size_of::<T>(),
352        )
353    }
354
355    #[doc(alias("One-sided", "onesided"))]
356    /// Return the calling PE's local data as an immutable slice
357    ///
358    /// # Safety
359    /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's may access this PE's local data.
360    /// It is also possible to have mutable and immutable references to this arrays data on the same PE
361    ///
362    /// # One-sided Operation
363    /// Only returns local data on the calling PE
364    ///
365    /// # Examples
366    ///```
367    /// use lamellar::array::prelude::*;
368    /// let world = LamellarWorldBuilder::new().build();
369    /// let my_pe = world.my_pe();
370    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
371    /// unsafe {
372    ///     let slice = array.local_data();
373    ///     println!("PE{my_pe} data: {slice:?}");
374    /// }
375    ///```
376    pub unsafe fn local_data(&self) -> &[T] {
377        self.local_as_mut_slice()
378    }
379
380    #[doc(alias("One-sided", "onesided"))]
381    /// Return the calling PE's local data as a mutable slice
382    ///
383    /// # Safety
384    /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's may access this PE's local data.
385    /// It is also possible to have mutable and immutable references to this arrays data on the same PE
386    ///
387    /// # One-sided Operation
388    /// Only returns local data on the calling PE
389    ///
390    /// # Examples
391    ///```
392    /// use lamellar::array::prelude::*;
393    /// let world = LamellarWorldBuilder::new().build();
394    /// let my_pe = world.my_pe();
395    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
396    ///
397    /// unsafe {
398    ///     let slice = array.mut_local_data();
399    ///     for elem in slice{
400    ///         *elem += 1;
401    ///     }
402    /// }
403    ///```
404    pub unsafe fn mut_local_data(&self) -> &mut [T] {
405        self.local_as_mut_slice()
406    }
407
408    pub(crate) fn local_as_mut_ptr(&self) -> *mut T {
409        let u8_ptr = unsafe { self.inner.local_as_mut_ptr() };
410        // self.inner.data.mem_region.as_casted_mut_ptr::<T>().unwrap();
411        // println!("ptr: {:?} {:?}", u8_ptr, u8_ptr as *const T);
412        u8_ptr as *mut T
413    }
414
415    /// Return the index range with respect to the original array over which this array handle represents)
416    ///
417    /// # Examples
418    ///```
419    /// use lamellar::array::prelude::*;
420    /// let world = LamellarWorldBuilder::new().build();
421    /// let my_pe = world.my_pe();
422    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
423    ///
424    /// assert_eq!(array.sub_array_range(),(0..100));
425    ///
426    /// let sub_array = array.sub_array(25..75);
427    /// assert_eq!(sub_array.sub_array_range(),(25..75));
428    ///```
429    pub fn sub_array_range(&self) -> std::ops::Range<usize> {
430        self.inner.offset..(self.inner.offset + self.inner.size)
431    }
432
433    pub(crate) fn team_rt(&self) -> Pin<Arc<LamellarTeamRT>> {
434        self.inner.data.team.clone()
435    }
436
437    pub(crate) async fn await_all(&self) {
438        let am_counters = self.inner.data.array_counters.clone();
439
440        let mut temp_now = Instant::now();
441        let mut orig_reqs = am_counters.send_req_cnt.load(Ordering::SeqCst);
442        let mut orig_launched = am_counters.launched_req_cnt.load(Ordering::SeqCst);
443        let mut done = false;
444        while !done {
445            while self.team().panic.load(Ordering::SeqCst) == 0
446                && ((am_counters.outstanding_reqs.load(Ordering::SeqCst) > 0)
447                    || orig_reqs != am_counters.send_req_cnt.load(Ordering::SeqCst)
448                    || orig_launched != am_counters.launched_req_cnt.load(Ordering::SeqCst))
449            {
450                orig_reqs = am_counters.send_req_cnt.load(Ordering::SeqCst);
451                orig_launched = am_counters.launched_req_cnt.load(Ordering::SeqCst);
452                async_std::task::yield_now().await;
453                if temp_now.elapsed().as_secs_f64() > config().deadlock_timeout {
454                    println!(
455                        "in darc await_all mype: {:?} cnt: {:?} {:?}",
456                        self.team_rt().world_pe,
457                        am_counters.send_req_cnt.load(Ordering::SeqCst),
458                        am_counters.outstanding_reqs.load(Ordering::SeqCst),
459                    );
460                    temp_now = Instant::now();
461                }
462            }
463            if am_counters.send_req_cnt.load(Ordering::SeqCst)
464                != am_counters.launched_req_cnt.load(Ordering::SeqCst)
465            {
466                if am_counters.outstanding_reqs.load(Ordering::SeqCst) > 0
467                    || orig_reqs != am_counters.send_req_cnt.load(Ordering::SeqCst)
468                    || orig_launched != am_counters.launched_req_cnt.load(Ordering::SeqCst)
469                {
470                    continue;
471                }
472                println!(
473                    "in darc await_all mype: {:?} cnt: {:?} {:?} {:?}",
474                    self.team_rt().world_pe,
475                    am_counters.send_req_cnt.load(Ordering::SeqCst),
476                    am_counters.outstanding_reqs.load(Ordering::SeqCst),
477                    am_counters.launched_req_cnt.load(Ordering::SeqCst)
478                );
479                RuntimeWarning::UnspawnedTask(
480                    "`await_all` before all tasks/active messages have been spawned",
481                )
482                .print();
483            }
484            done = true;
485        }
486
487        self.inner.data.task_group.await_all().await;
488        // println!("done in wait all {:?}",std::time::SystemTime::now());
489    }
490
491    pub(crate) async fn await_on_outstanding(&self, mode: DarcMode) {
492        self.await_all().await;
493        // println!("block on outstanding");
494        // self.inner.data.print();
495        // let the_array: UnsafeArray<T> = self.clone();
496        let array_darc = self.inner.data.clone();
497        array_darc.block_on_outstanding(mode, 1).await;
498    }
499
500    #[doc(alias = "Collective")]
501    /// Convert this UnsafeArray into a (safe) [ReadOnlyArray]
502    ///
503    /// This is a collective and blocking function which will only return when there is at most a single reference on each PE
504    /// to this Array, and that reference is currently calling this function.
505    ///
506    /// When it returns, it is gauranteed that there are only  `ReadOnlyArray` handles to the underlying data
507    ///
508    /// # Collective Operation
509    /// Requires all PEs associated with the `array` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally)
510    ///
511    /// # Examples
512    ///```
513    /// use lamellar::array::prelude::*;
514    /// let world = LamellarWorldBuilder::new().build();
515    /// let my_pe = world.my_pe();
516    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
517    ///
518    /// let read_only_array = array.into_read_only().block();
519    ///```
520    ///
521    /// # Warning
522    /// Because this call blocks there is the possibility for deadlock to occur, as highlighted below:
523    ///```no_run
524    /// use lamellar::array::prelude::*;
525    /// let world = LamellarWorldBuilder::new().build();
526    /// let my_pe = world.my_pe();
527    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
528    ///
529    /// let array1 = array.clone();
530    /// let mut_slice = unsafe {array1.local_as_mut_slice()};
531    ///
532    /// // no borrows to this specific instance (array) so it can enter the "into_read_only" call
533    /// // but array1 will not be dropped until after mut_slice is dropped.
534    /// // Given the ordering of these calls we will get stuck in "into_read_only" as it
535    /// // waits for the reference count to go down to "1" (but we will never be able to drop mut_slice/array1).
536    /// let ro_array = array.into_read_only().block();
537    /// ro_array.print();
538    /// println!("{mut_slice:?}");
539    ///```
540    /// Instead we would want to do something like:
541    ///```
542    /// use lamellar::array::prelude::*;
543    /// let world = LamellarWorldBuilder::new().build();
544    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
545    ///
546    /// let array1 = array.clone();
547    /// let slice = unsafe {array1.local_data()};
548    ///
549    /// // do something interesting with the slice and then manually drop the slice and array1 to ensure the reference count for array is 1.
550    /// println!("{:?}",slice[0]);
551    /// drop(slice);
552    /// drop(array1);
553    /// // now we can call into_read_only and it will not deadlock
554    /// let ro_array = array.into_read_only().block();
555    /// ro_array.print();
556    ///```
557    pub fn into_read_only(self) -> IntoReadOnlyArrayHandle<T> {
558        // println!("unsafe into read only");
559        IntoReadOnlyArrayHandle {
560            team: self.team_rt(),
561            launched: false,
562            outstanding_future: Box::pin(self.async_into()),
563        }
564    }
565
566    // pub fn into_local_only(self) -> LocalOnlyArray<T> {
567    //     // println!("unsafe into local only");
568    //     self.into()
569    // }
570
571    #[doc(alias = "Collective")]
572    /// Convert this UnsafeArray into a (safe) [LocalLockArray]
573    ///
574    /// This is a collective and blocking function which will only return when there is at most a single reference on each PE
575    /// to this Array, and that reference is currently calling this function.
576    ///
577    /// When it returns, it is gauranteed that there are only `LocalLockArray` handles to the underlying data
578    ///
579    /// # Collective Operation
580    /// Requires all PEs associated with the `array` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally)
581    ///
582    /// # Examples
583    ///```
584    /// use lamellar::array::prelude::*;
585    /// let world = LamellarWorldBuilder::new().build();
586    /// let my_pe = world.my_pe();
587    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
588    ///
589    /// let local_lock_array = array.into_local_lock().block();
590    ///```
591    /// # Warning
592    /// Because this call blocks there is the possibility for deadlock to occur, as highlighted below:
593    ///```no_run
594    /// use lamellar::array::prelude::*;
595    /// let world = LamellarWorldBuilder::new().build();
596    /// let my_pe = world.my_pe();
597    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
598    ///
599    /// let array1 = array.clone();
600    /// let mut_slice = unsafe {array1.local_as_mut_slice()};
601    ///
602    /// // no borrows to this specific instance (array) so it can enter the "into_local_lock" call
603    /// // but array1 will not be dropped until after mut_slice is dropped.
604    /// // Given the ordering of these calls we will get stuck in "iinto_local_lock" as it
605    /// // waits for the reference count to go down to "1" (but we will never be able to drop mut_slice/array1).
606    /// let local_lock_array = array.into_local_lock().block();
607    /// local_lock_array.print();
608    /// println!("{mut_slice:?}");
609    ///```
610    /// Instead we would want to do something like:
611    ///```
612    /// use lamellar::array::prelude::*;
613    /// let world = LamellarWorldBuilder::new().build();
614    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
615    ///
616    /// let array1 = array.clone();
617    /// let slice = unsafe {array1.local_data()};
618    ///
619    /// // do something interesting with the slice and then manually drop the slice and array1 to ensure the reference count for array is 1.
620    /// println!("{:?}",slice[0]);
621    /// drop(slice);
622    /// drop(array1);
623    /// // now we can call into_local_lock and it will not deadlock
624    /// let local_lock_array = array.into_local_lock().block();
625    /// local_lock_array.print();
626    ///```
627    pub fn into_local_lock(self) -> IntoLocalLockArrayHandle<T> {
628        // println!("unsafe into local lock atomic");
629        // self.into()
630
631        IntoLocalLockArrayHandle {
632            team: self.team_rt(),
633            launched: false,
634            outstanding_future: Box::pin(self.async_into()),
635        }
636    }
637
638    #[doc(alias = "Collective")]
639    /// Convert this UnsafeArray into a (safe) [GlobalLockArray]
640    ///
641    /// This is a collective and blocking function which will only return when there is at most a single reference on each PE
642    /// to this Array, and that reference is currently calling this function.
643    ///
644    /// When it returns, it is gauranteed that there are only `GlobalLockArray` handles to the underlying data
645    ///
646    /// # Collective Operation
647    /// Requires all PEs associated with the `array` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally)
648    ///
649    /// # Examples
650    ///```
651    /// use lamellar::array::prelude::*;
652    /// let world = LamellarWorldBuilder::new().build();
653    /// let my_pe = world.my_pe();
654    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
655    ///
656    /// let global_lock_array = array.into_global_lock().block();
657    ///```
658    /// # Warning
659    /// Because this call blocks there is the possibility for deadlock to occur, as highlighted below:
660    ///```no_run
661    /// use lamellar::array::prelude::*;
662    /// let world = LamellarWorldBuilder::new().build();
663    /// let my_pe = world.my_pe();
664    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
665    ///
666    /// let array1 = array.clone();
667    /// let slice = unsafe {array1.local_data()};
668    ///
669    /// // no borrows to this specific instance (array) so it can enter the "into_global_lock" call
670    /// // but array1 will not be dropped until after mut_slice is dropped.
671    /// // Given the ordering of these calls we will get stuck in "into_global_lock" as it
672    /// // waits for the reference count to go down to "1" (but we will never be able to drop slice/array1).
673    /// let global_lock_array = array.into_global_lock().block();
674    /// global_lock_array.print();
675    /// println!("{slice:?}");
676    ///```
677    /// Instead we would want to do something like:
678    ///```
679    /// use lamellar::array::prelude::*;
680    /// let world = LamellarWorldBuilder::new().build();
681    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
682    ///
683    /// let array1 = array.clone();
684    /// let slice = unsafe {array1.local_data()};
685    ///
686    /// // do something interesting with the slice and then manually drop the slice and array1 to ensure the reference count for array is 1.
687    /// println!("{:?}",slice[0]);
688    /// drop(slice);
689    /// drop(array1);
690    /// // now we can call into_global_lock and it will not deadlock
691    /// let global_lock_array = array.into_global_lock().block();
692    /// global_lock_array.print();
693    ///```
694    pub fn into_global_lock(self) -> IntoGlobalLockArrayHandle<T> {
695        // println!("readonly into_global_lock");
696        IntoGlobalLockArrayHandle {
697            team: self.team_rt(),
698            launched: false,
699            outstanding_future: Box::pin(self.async_into()),
700        }
701    }
702
703    pub(crate) fn async_barrier(&self) -> BarrierHandle {
704        self.inner.data.team.async_barrier()
705    }
706}
707
708impl<T: Dist + 'static> UnsafeArray<T> {
709    #[doc(alias = "Collective")]
710    /// Convert this UnsafeArray into a (safe) [AtomicArray]
711    ///
712    /// This is a collective and blocking function which will only return when there is at most a single reference on each PE
713    /// to this Array, and that reference is currently calling this function.
714    ///
715    /// When it returns, it is gauranteed that there are only `AtomicArray` handles to the underlying data
716    ///
717    /// # Collective Operation
718    /// Requires all PEs associated with the `array` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally)
719    ///
720    /// # Examples
721    ///```
722    /// use lamellar::array::prelude::*;
723    /// let world = LamellarWorldBuilder::new().build();
724    /// let my_pe = world.my_pe();
725    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
726    ///
727    /// let atomic_array = array.into_atomic().block();
728    ///```
729    /// # Warning
730    /// Because this call blocks there is the possibility for deadlock to occur, as highlighted below:
731    ///```no_run
732    /// use lamellar::array::prelude::*;
733    /// let world = LamellarWorldBuilder::new().build();
734    /// let my_pe = world.my_pe();
735    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
736    ///
737    /// let array1 = array.clone();
738    /// let mut_slice = unsafe {array1.local_as_mut_slice()};
739    ///
740    /// // no borrows to this specific instance (array) so it can enter the "into_atomic" call
741    /// // but array1 will not be dropped until after mut_slice is dropped.
742    /// // Given the ordering of these calls we will get stuck in "into_atomic" as it
743    /// // waits for the reference count to go down to "1" (but we will never be able to drop mut_slice/array1).
744    /// let atomic_array = array.into_atomic().block();
745    /// atomic_array.print();
746    /// println!("{mut_slice:?}");
747    ///```
748    /// Instead we would want to do something like:
749    ///```
750    /// use lamellar::array::prelude::*;
751    /// let world = LamellarWorldBuilder::new().build();
752    /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
753    ///
754    /// let array1 = array.clone();
755    /// let slice = unsafe {array1.local_data()};
756    ///
757    /// // do something interesting with the slice and then manually drop the slice and array1 to ensure the reference count for array is 1.
758    /// println!("{:?}",slice[0]);
759    /// drop(slice);
760    /// drop(array1);
761    /// // now we can call into_atomic and it will not deadlock
762    /// let atomic_array = array.into_atomic().block();
763    /// atomic_array.print();
764    ///```
765    pub fn into_atomic(self) -> IntoAtomicArrayHandle<T> {
766        // println!("unsafe into atomic");
767        IntoAtomicArrayHandle {
768            team: self.team_rt(),
769            launched: false,
770            outstanding_future: Box::pin(self.async_into()),
771        }
772    }
773}
774
775// use crate::array::private::LamellarArrayPrivate;
776// impl <T: Dist, A: LamellarArrayPrivate<T>> From<A> for UnsafeArray<T>{
777//     fn from(array: A) -> Self {
778//        let array = array.into_inner();
779//        array.block_on_outstanding(DarcMode::UnsafeArray);
780//        array.create_buffered_ops();
781//        array
782//     }
783// }
784
785impl<T: Dist + ArrayOps> TeamFrom<(Vec<T>, Distribution)> for UnsafeArray<T> {
786    fn team_from(input: (Vec<T>, Distribution), team: &Arc<LamellarTeam>) -> Self {
787        let (vals, distribution) = input;
788        let input = (&vals, distribution);
789        TeamInto::team_into(input, team)
790    }
791}
792
793// #[async_trait]
794impl<T: Dist + ArrayOps> AsyncTeamFrom<(Vec<T>, Distribution)> for UnsafeArray<T> {
795    async fn team_from(input: (Vec<T>, Distribution), team: &Arc<LamellarTeam>) -> Self {
796        let (local_vals, distribution) = input;
797        let team = team.team.clone();
798        // println!("local_vals len: {:?}", local_vals.len());
799        team.async_barrier().await;
800        let local_sizes = UnsafeArray::<usize>::async_new(
801            team.clone(),
802            team.num_pes,
803            Distribution::Block,
804            crate::darc::DarcMode::UnsafeArray,
805        )
806        .await;
807        unsafe {
808            local_sizes.local_as_mut_slice()[0] = local_vals.len();
809        }
810        team.async_barrier().await;
811        // local_sizes.barrier();
812        let mut size = 0;
813        let mut my_start = 0;
814        let my_pe = team.team_pe.expect("pe not part of team");
815        unsafe {
816            local_sizes
817                .buffered_onesided_iter(team.num_pes)
818                .into_stream()
819                .enumerate()
820                .for_each(|(i, local_size)| {
821                    size += local_size;
822                    if i < my_pe {
823                        my_start += local_size;
824                    }
825                    future::ready(())
826                })
827                .await;
828        }
829        let array = UnsafeArray::<T>::async_new(
830            team.clone(),
831            size,
832            distribution,
833            crate::darc::DarcMode::UnsafeArray,
834        )
835        .await;
836        if local_vals.len() > 0 {
837            unsafe { array.put(my_start, local_vals).await };
838        }
839        team.async_barrier().await;
840        array
841    }
842}
843
844impl<T: Dist + ArrayOps> TeamFrom<(&Vec<T>, Distribution)> for UnsafeArray<T> {
845    fn team_from(input: (&Vec<T>, Distribution), team: &Arc<LamellarTeam>) -> Self {
846        let (local_vals, distribution) = input;
847        let team = team.team.clone();
848        // println!("local_vals len: {:?}", local_vals.len());
849        // team.tasking_barrier();
850        let local_sizes =
851            UnsafeArray::<usize>::new(team.clone(), team.num_pes, Distribution::Block).block();
852        unsafe {
853            local_sizes.local_as_mut_slice()[0] = local_vals.len();
854        }
855        local_sizes.barrier();
856        let mut size = 0;
857        let mut my_start = 0;
858        let my_pe = team.team_pe.expect("pe not part of team");
859        unsafe {
860            local_sizes
861                .buffered_onesided_iter(team.num_pes)
862                .into_iter()
863                .enumerate()
864                .for_each(|(i, local_size)| {
865                    size += local_size;
866                    if i < my_pe {
867                        my_start += local_size;
868                    }
869                });
870        }
871        let array = UnsafeArray::<T>::new(team.clone(), size, distribution).block();
872        if local_vals.len() > 0 {
873            array.block_on(unsafe { array.put(my_start, local_vals) });
874        }
875        array.barrier();
876        array
877    }
878}
879
880// impl<T: Dist> From<AtomicArray<T>> for UnsafeArray<T> {
881//     fn from(array: AtomicArray<T>) -> Self {
882//         match array {
883//             AtomicArray::NativeAtomicArray(array) => UnsafeArray::<T>::from(array),
884//             AtomicArray::GenericAtomicArray(array) => UnsafeArray::<T>::from(array),
885//         }
886//     }
887// }
888
889#[async_trait]
890impl<T: Dist> AsyncFrom<AtomicArray<T>> for UnsafeArray<T> {
891    async fn async_from(array: AtomicArray<T>) -> Self {
892        match array {
893            AtomicArray::NativeAtomicArray(array) => UnsafeArray::<T>::async_from(array).await,
894            AtomicArray::GenericAtomicArray(array) => UnsafeArray::<T>::async_from(array).await,
895        }
896    }
897}
898
899#[async_trait]
900impl<T: Dist> AsyncFrom<NativeAtomicArray<T>> for UnsafeArray<T> {
901    async fn async_from(array: NativeAtomicArray<T>) -> Self {
902        array
903            .array
904            .await_on_outstanding(DarcMode::UnsafeArray)
905            .await;
906        array.array
907    }
908}
909
910#[async_trait]
911impl<T: Dist> AsyncFrom<GenericAtomicArray<T>> for UnsafeArray<T> {
912    async fn async_from(array: GenericAtomicArray<T>) -> Self {
913        array
914            .array
915            .await_on_outstanding(DarcMode::UnsafeArray)
916            .await;
917        array.array
918    }
919}
920
921#[async_trait]
922impl<T: Dist> AsyncFrom<LocalLockArray<T>> for UnsafeArray<T> {
923    async fn async_from(array: LocalLockArray<T>) -> Self {
924        array
925            .array
926            .await_on_outstanding(DarcMode::UnsafeArray)
927            .await;
928        array.array
929    }
930}
931
932#[async_trait]
933impl<T: Dist> AsyncFrom<GlobalLockArray<T>> for UnsafeArray<T> {
934    async fn async_from(array: GlobalLockArray<T>) -> Self {
935        array
936            .array
937            .await_on_outstanding(DarcMode::UnsafeArray)
938            .await;
939        array.array
940    }
941}
942
943#[async_trait]
944impl<T: Dist> AsyncFrom<ReadOnlyArray<T>> for UnsafeArray<T> {
945    async fn async_from(array: ReadOnlyArray<T>) -> Self {
946        array
947            .array
948            .await_on_outstanding(DarcMode::UnsafeArray)
949            .await;
950        array.array
951    }
952}
953
954impl<T: Dist> From<UnsafeByteArray> for UnsafeArray<T> {
955    fn from(array: UnsafeByteArray) -> Self {
956        UnsafeArray {
957            inner: array.inner,
958            phantom: PhantomData,
959        }
960    }
961}
962
963impl<T: Dist> From<&UnsafeByteArray> for UnsafeArray<T> {
964    fn from(array: &UnsafeByteArray) -> Self {
965        UnsafeArray {
966            inner: array.inner.clone(),
967            phantom: PhantomData,
968        }
969    }
970}
971
972impl<T: Dist> From<UnsafeArray<T>> for UnsafeByteArray {
973    fn from(array: UnsafeArray<T>) -> Self {
974        UnsafeByteArray { inner: array.inner }
975    }
976}
977
978impl<T: Dist> From<&UnsafeArray<T>> for UnsafeByteArray {
979    fn from(array: &UnsafeArray<T>) -> Self {
980        UnsafeByteArray {
981            inner: array.inner.clone(),
982        }
983    }
984}
985
986impl<T: Dist> From<UnsafeArray<T>> for LamellarByteArray {
987    fn from(array: UnsafeArray<T>) -> Self {
988        LamellarByteArray::UnsafeArray(array.into())
989    }
990}
991
992impl<T: Dist> From<LamellarByteArray> for UnsafeArray<T> {
993    fn from(array: LamellarByteArray) -> Self {
994        if let LamellarByteArray::UnsafeArray(array) = array {
995            array.into()
996        } else {
997            panic!("Expected LamellarByteArray::UnsafeArray")
998        }
999    }
1000}
1001
1002impl<T: Dist> ArrayExecAm<T> for UnsafeArray<T> {
1003    fn team_rt(&self) -> Pin<Arc<LamellarTeamRT>> {
1004        self.team_rt()
1005    }
1006    fn team_counters(&self) -> Arc<AMCounters> {
1007        self.inner.data.array_counters.clone()
1008    }
1009}
1010impl<T: Dist> LamellarArrayPrivate<T> for UnsafeArray<T> {
1011    fn inner_array(&self) -> &UnsafeArray<T> {
1012        self
1013    }
1014    fn local_as_ptr(&self) -> *const T {
1015        self.local_as_mut_ptr()
1016    }
1017    fn local_as_mut_ptr(&self) -> *mut T {
1018        self.local_as_mut_ptr()
1019    }
1020    fn pe_for_dist_index(&self, index: usize) -> Option<usize> {
1021        self.inner.pe_for_dist_index(index)
1022    }
1023    fn pe_offset_for_dist_index(&self, pe: usize, index: usize) -> Option<usize> {
1024        if self.inner.sub {
1025            self.inner.pe_sub_offset_for_dist_index(pe, index)
1026        } else {
1027            self.inner.pe_full_offset_for_dist_index(pe, index)
1028        }
1029    }
1030
1031    unsafe fn into_inner(self) -> UnsafeArray<T> {
1032        self
1033    }
1034    fn as_lamellar_byte_array(&self) -> LamellarByteArray {
1035        self.clone().into()
1036    }
1037}
1038
1039impl<T: Dist> ActiveMessaging for UnsafeArray<T> {
1040    type SinglePeAmHandle<R: AmDist> = AmHandle<R>;
1041    type MultiAmHandle<R: AmDist> = MultiAmHandle<R>;
1042    type LocalAmHandle<L> = LocalAmHandle<L>;
1043    fn exec_am_all<F>(&self, am: F) -> Self::MultiAmHandle<F::Output>
1044    where
1045        F: RemoteActiveMessage + LamellarAM + Serde + AmDist,
1046    {
1047        self.inner
1048            .data
1049            .team
1050            .exec_am_all_tg(am, Some(self.team_counters()))
1051    }
1052    fn exec_am_pe<F>(&self, pe: usize, am: F) -> Self::SinglePeAmHandle<F::Output>
1053    where
1054        F: RemoteActiveMessage + LamellarAM + Serde + AmDist,
1055    {
1056        self.inner
1057            .data
1058            .team
1059            .exec_am_pe_tg(pe, am, Some(self.team_counters()))
1060    }
1061    fn exec_am_local<F>(&self, am: F) -> Self::LocalAmHandle<F::Output>
1062    where
1063        F: LamellarActiveMessage + LocalAM + 'static,
1064    {
1065        self.inner
1066            .data
1067            .team
1068            .exec_am_local_tg(am, Some(self.team_counters()))
1069    }
1070    fn wait_all(&self) {
1071        let mut temp_now = Instant::now();
1072        // println!(
1073        //     "in array wait_all  cnt: {:?} {:?} {:?}",
1074        //     self.inner
1075        //         .data
1076        //         .array_counters
1077        //         .send_req_cnt
1078        //         .load(Ordering::SeqCst),
1079        //     self.inner
1080        //         .data
1081        //         .array_counters
1082        //         .outstanding_reqs
1083        //         .load(Ordering::SeqCst),
1084        //     self.inner
1085        //         .data
1086        //         .array_counters
1087        //         .launched_req_cnt
1088        //         .load(Ordering::SeqCst)
1089        // );
1090        // let mut first = true;
1091        while self
1092            .inner
1093            .data
1094            .array_counters
1095            .outstanding_reqs
1096            .load(Ordering::SeqCst)
1097            > 0
1098            || self.inner.data.req_cnt.load(Ordering::SeqCst) > 0
1099        {
1100            // std::thread::yield_now();
1101            // self.inner.data.team.flush();
1102            self.inner.data.team.scheduler.exec_task(); //mmight as well do useful work while we wait
1103            if temp_now.elapsed().as_secs_f64() > config().deadlock_timeout {
1104                //|| first{
1105                println!(
1106                    "in array wait_all mype: {:?} cnt: {:?} {:?} {:?}",
1107                    self.inner.data.team.world_pe,
1108                    self.inner
1109                        .data
1110                        .array_counters
1111                        .send_req_cnt
1112                        .load(Ordering::SeqCst),
1113                    self.inner
1114                        .data
1115                        .array_counters
1116                        .outstanding_reqs
1117                        .load(Ordering::SeqCst),
1118                    self.inner.data.req_cnt.load(Ordering::SeqCst)
1119                );
1120                temp_now = Instant::now();
1121                // first = false;
1122            }
1123        }
1124        if self
1125            .inner
1126            .data
1127            .array_counters
1128            .send_req_cnt
1129            .load(Ordering::SeqCst)
1130            != self
1131                .inner
1132                .data
1133                .array_counters
1134                .launched_req_cnt
1135                .load(Ordering::SeqCst)
1136        {
1137            println!(
1138                "in array wait_all  cnt: {:?} {:?} {:?}",
1139                self.inner
1140                    .data
1141                    .array_counters
1142                    .send_req_cnt
1143                    .load(Ordering::SeqCst),
1144                self.inner
1145                    .data
1146                    .array_counters
1147                    .outstanding_reqs
1148                    .load(Ordering::SeqCst),
1149                self.inner
1150                    .data
1151                    .array_counters
1152                    .launched_req_cnt
1153                    .load(Ordering::SeqCst)
1154            );
1155            RuntimeWarning::UnspawnedTask(
1156                "`wait_all` on an array before all operations, iterators, etc, created by the array have been spawned",
1157            )
1158            .print();
1159        }
1160        self.inner.data.task_group.wait_all();
1161    }
1162    fn await_all(&self) -> impl Future<Output = ()> + Send {
1163        self.await_all()
1164    }
1165    fn barrier(&self) {
1166        self.inner.data.team.barrier()
1167    }
1168    fn async_barrier(&self) -> BarrierHandle {
1169        self.inner.data.team.async_barrier()
1170    }
1171    fn spawn<F: Future>(&self, f: F) -> LamellarTask<F::Output>
1172    where
1173        F: Future + Send + 'static,
1174        F::Output: Send,
1175    {
1176        self.inner.data.team.scheduler.spawn_task(
1177            f,
1178            vec![
1179                self.inner.data.team.world_counters.clone(),
1180                self.inner.data.team.team_counters.clone(),
1181                self.inner.data.array_counters.clone(),
1182            ],
1183        )
1184    }
1185    fn block_on<F: Future>(&self, f: F) -> F::Output {
1186        self.inner.data.team.scheduler.block_on(f)
1187    }
1188    fn block_on_all<I>(&self, iter: I) -> Vec<<<I as IntoIterator>::Item as Future>::Output>
1189    where
1190        I: IntoIterator,
1191        <I as IntoIterator>::Item: Future + Send + 'static,
1192        <<I as IntoIterator>::Item as Future>::Output: Send,
1193    {
1194        self.inner.data.team.block_on_all(iter)
1195    }
1196}
1197
1198impl<T: Dist> LamellarArray<T> for UnsafeArray<T> {
1199    fn len(&self) -> usize {
1200        self.inner.size
1201    }
1202
1203    fn num_elems_local(&self) -> usize {
1204        self.inner.num_elems_local()
1205    }
1206    //#[tracing::instrument(skip_all)]
1207    fn pe_and_offset_for_global_index(&self, index: usize) -> Option<(usize, usize)> {
1208        if self.inner.sub {
1209            // println!("sub array {index}");
1210            let pe = self.inner.pe_for_dist_index(index)?;
1211            // println!("pe: {pe}");
1212            let offset = self.inner.pe_sub_offset_for_dist_index(pe, index)?;
1213            // println!(
1214            //     "sub array index {index} pe {pe} offset {offset} size {} {} {}",
1215            //     self.inner.size,
1216            //     self.inner.num_elems_pe(0),
1217            //     self.inner.num_elems_pe(1)
1218            // );
1219            Some((pe, offset))
1220        } else {
1221            self.inner.full_pe_and_offset_for_global_index(index)
1222        }
1223    }
1224
1225    fn first_global_index_for_pe(&self, pe: usize) -> Option<usize> {
1226        self.inner.start_index_for_pe(pe)
1227    }
1228
1229    fn last_global_index_for_pe(&self, pe: usize) -> Option<usize> {
1230        self.inner.end_index_for_pe(pe)
1231    }
1232}
1233
1234impl<T: Dist> LamellarEnv for UnsafeArray<T> {
1235    fn my_pe(&self) -> usize {
1236        self.inner.data.my_pe
1237    }
1238
1239    fn num_pes(&self) -> usize {
1240        self.inner.data.num_pes
1241    }
1242
1243    fn num_threads_per_pe(&self) -> usize {
1244        self.inner.data.team.num_threads()
1245    }
1246    fn world(&self) -> Arc<LamellarTeam> {
1247        self.inner.data.team.world()
1248    }
1249    fn team(&self) -> Arc<LamellarTeam> {
1250        self.inner.data.team.team()
1251    }
1252}
1253
1254impl<T: Dist> LamellarWrite for UnsafeArray<T> {}
1255impl<T: Dist> LamellarRead for UnsafeArray<T> {}
1256
1257impl<T: Dist> SubArray<T> for UnsafeArray<T> {
1258    type Array = UnsafeArray<T>;
1259    fn sub_array<R: std::ops::RangeBounds<usize>>(&self, range: R) -> Self::Array {
1260        let start = match range.start_bound() {
1261            //inclusive
1262            Bound::Included(idx) => *idx,
1263            Bound::Excluded(idx) => *idx + 1,
1264            Bound::Unbounded => 0,
1265        };
1266        let end = match range.end_bound() {
1267            //exclusive
1268            Bound::Included(idx) => *idx + 1,
1269            Bound::Excluded(idx) => *idx,
1270            Bound::Unbounded => self.inner.size,
1271        };
1272        if end > self.inner.size {
1273            panic!(
1274                "subregion range ({:?}-{:?}) exceeds size of array {:?}",
1275                start, end, self.inner.size
1276            );
1277        }
1278        // println!(
1279        //     "new inner start {:?} end {:?} size {:?} cur offset {:?} cur size {:?}",
1280        //     start,
1281        //     end,
1282        //     end - start,
1283        //     self.inner.offset,
1284        //     self.inner.size
1285        // );
1286        let mut inner = self.inner.clone();
1287        inner.offset += start;
1288        inner.size = end - start;
1289        inner.sub = true;
1290        UnsafeArray {
1291            inner: inner,
1292            phantom: PhantomData,
1293        }
1294    }
1295    fn global_index(&self, sub_index: usize) -> usize {
1296        self.inner.offset + sub_index
1297    }
1298}
1299
1300impl<T: Dist + std::fmt::Debug> UnsafeArray<T> {
1301    #[doc(alias = "Collective")]
1302    /// Print the data within a lamellar array
1303    ///
1304    /// # Collective Operation
1305    /// Requires all PEs associated with the array to enter the print call otherwise deadlock will occur (i.e. barriers are being called internally)
1306    ///
1307    /// # Examples
1308    ///```
1309    /// use lamellar::array::prelude::*;
1310    /// let world = LamellarWorldBuilder::new().build();
1311    /// let block_array = UnsafeArray::<usize>::new(&world,100,Distribution::Block).block();
1312    /// let cyclic_array = UnsafeArray::<usize>::new(&world,100,Distribution::Block).block();
1313    ///
1314    /// unsafe{
1315    ///     let _ =block_array.dist_iter_mut().enumerate().for_each(move |(i,elem)| {
1316    ///         *elem = i;
1317    ///     }).spawn();
1318    ///     let _ = cyclic_array.dist_iter_mut().enumerate().for_each(move |(i,elem)| {
1319    ///         *elem = i;
1320    ///     }).spawn();
1321    /// }
1322    /// world.wait_all();
1323    /// block_array.print();
1324    /// println!();
1325    /// cyclic_array.print();
1326    ///```
1327    pub fn print(&self) {
1328        <UnsafeArray<T> as ArrayPrint<T>>::print(&self);
1329    }
1330}
1331
1332impl<T: Dist + std::fmt::Debug> ArrayPrint<T> for UnsafeArray<T> {
1333    fn print(&self) {
1334        self.inner.data.team.tasking_barrier(); //TODO: have barrier accept a string so we can print where we are stalling.
1335        for pe in 0..self.inner.data.team.num_pes() {
1336            self.inner.data.team.tasking_barrier();
1337            if self.inner.data.my_pe == pe {
1338                println!("[pe {:?} data] {:?}", pe, unsafe { self.local_as_slice() });
1339            }
1340            std::thread::sleep(std::time::Duration::from_millis(100));
1341        }
1342    }
1343}
1344
1345impl<T: Dist + AmDist + 'static> UnsafeArray<T> {
1346    pub(crate) fn get_reduction_op(
1347        &self,
1348        op: &str,
1349        byte_array: LamellarByteArray,
1350    ) -> LamellarArcAm {
1351        REDUCE_OPS
1352            .get(&(std::any::TypeId::of::<T>(), op))
1353            .expect("unexpected reduction type")(byte_array, self.inner.data.team.num_pes())
1354    }
1355    pub(crate) fn reduce_data(
1356        &self,
1357        op: &str,
1358        byte_array: LamellarByteArray,
1359    ) -> AmHandle<Option<T>> {
1360        let func = self.get_reduction_op(op, byte_array);
1361        if let Ok(my_pe) = self.inner.data.team.team_pe_id() {
1362            self.inner.data.team.exec_arc_am_pe::<Option<T>>(
1363                my_pe,
1364                func,
1365                Some(self.inner.data.array_counters.clone()),
1366            )
1367        } else {
1368            self.inner.data.team.exec_arc_am_pe::<Option<T>>(
1369                0,
1370                func,
1371                Some(self.inner.data.array_counters.clone()),
1372            )
1373        }
1374    }
1375}
1376
1377// This is esentially impl LamellarArrayReduce, but we man to explicity have UnsafeArray expose unsafe functions
1378impl<T: Dist + AmDist + 'static> UnsafeArray<T> {
1379    #[doc(alias("One-sided", "onesided"))]
1380    /// Perform a reduction on the entire distributed array, returning the value to the calling PE.
1381    ///
1382    /// Please see the documentation for the [register_reduction] procedural macro for
1383    /// more details and examples on how to create your own reductions.
1384    ///
1385    /// # Safety
1386    /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's or local threads may access this PE's local data.
1387    /// Any updates to local data are not guaranteed to be Atomic.
1388    ///
1389    /// # One-sided Operation
1390    /// The calling PE is responsible for launching `Reduce` active messages on the other PEs associated with the array.
1391    /// the returned reduction result is only available on the calling PE
1392    /// # Note
1393    /// The future retuned by this function is lazy and does nothing unless awaited, [spawned][AmHandle::spawn] or [blocked on][AmHandle::block]
1394    /// # Examples
1395    /// ```
1396    /// use lamellar::array::prelude::*;
1397    /// use rand::Rng;
1398    /// let world = LamellarWorldBuilder::new().build();
1399    /// let num_pes = world.num_pes();
1400    /// let array = UnsafeArray::<usize>::new(&world,1000000,Distribution::Block).block();
1401    /// let array_clone = array.clone();
1402    /// unsafe { // THIS IS NOT SAFE -- we are randomly updating elements, no protections, updates may be lost... DONT DO THIS
1403    ///     let req = array.local_iter().for_each(move |_| {
1404    ///         let index = rand::thread_rng().gen_range(0..array_clone.len());
1405    ///         let _ = array_clone.add(index,1).spawn(); //randomly at one to an element in the array.
1406    ///     }).spawn();
1407    /// }
1408    /// array.wait_all();
1409    /// array.barrier();
1410    /// let sum = unsafe {array.reduce("sum").block()}; // equivalent to calling array.sum()
1411    /// //assert_eq!(array.len()*num_pes,sum); // may or may not fail
1412    ///```
1413    #[must_use = "this function is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "]
1414    pub unsafe fn reduce(&self, op: &str) -> AmHandle<Option<T>> {
1415        self.reduce_data(op, self.clone().into())
1416    }
1417
1418    #[doc(alias("One-sided", "onesided"))]
1419    /// Perform a sum reduction on the entire distributed array, returning the value to the calling PE.
1420    ///
1421    /// This equivalent to `reduce("sum")`.
1422    ///
1423    /// # Safety
1424    /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's or local threads may access this PE's local data.
1425    /// Any updates to local data are not guaranteed to be Atomic.
1426    ///
1427    /// # One-sided Operation
1428    /// The calling PE is responsible for launching `Sum` active messages on the other PEs associated with the array.
1429    /// the returned sum reduction result is only available on the calling PE
1430    /// # Note
1431    /// The future retuned by this function is lazy and does nothing unless awaited, [spawned][AmHandle::spawn] or [blocked on][AmHandle::block]
1432    /// # Examples
1433    /// ```
1434    /// use lamellar::array::prelude::*;
1435    /// use rand::Rng;
1436    /// let world = LamellarWorldBuilder::new().build();
1437    /// let num_pes = world.num_pes();
1438    /// let array = UnsafeArray::<usize>::new(&world,1000000,Distribution::Block).block();
1439    /// let array_clone = array.clone();
1440    /// unsafe { // THIS IS NOT SAFE -- we are randomly updating elements, no protections, updates may be lost... DONT DO THIS
1441    ///     let req = array.local_iter().for_each(move |_| {
1442    ///         let index = rand::thread_rng().gen_range(0..array_clone.len());
1443    ///         let _ = array_clone.add(index,1).spawn(); //randomly at one to an element in the array.
1444    ///     }).spawn();
1445    /// }
1446    /// array.wait_all();
1447    /// array.barrier();
1448    /// let sum = unsafe{array.sum().block()}; //Safe in this instance as we have ensured no updates are currently happening
1449    /// // assert_eq!(array.len()*num_pes,sum);//this may or may not fail
1450    ///```
1451    #[must_use = "this function is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "]
1452    pub unsafe fn sum(&self) -> AmHandle<Option<T>> {
1453        self.reduce("sum")
1454    }
1455
1456    #[doc(alias("One-sided", "onesided"))]
1457    /// Perform a production reduction on the entire distributed array, returning the value to the calling PE.
1458    ///
1459    /// This equivalent to `reduce("prod")`.
1460    ///
1461    /// # Safety
1462    /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's or local threads may access this PE's local data.
1463    /// Any updates to local data are not guaranteed to be Atomic.
1464    ///
1465    /// # One-sided Operation
1466    /// The calling PE is responsible for launching `Prod` active messages on the other PEs associated with the array.
1467    /// the returned prod reduction result is only available on the calling PE
1468    /// # Note
1469    /// The future retuned by this function is lazy and does nothing unless awaited, [spawned][AmHandle::spawn] or [blocked on][AmHandle::block]
1470    /// # Examples
1471    /// ```
1472    /// use lamellar::array::prelude::*;
1473    /// use rand::Rng;
1474    /// let world = LamellarWorldBuilder::new().build();
1475    /// let num_pes = world.num_pes();
1476    /// let array = UnsafeArray::<usize>::new(&world,10,Distribution::Block).block();
1477    /// unsafe {
1478    ///     let req = array.dist_iter_mut().enumerate().for_each(move |(i,elem)| {
1479    ///         *elem = i+1;
1480    ///     }).spawn();
1481    /// }
1482    /// array.print();
1483    /// array.wait_all();
1484    /// array.print();
1485    /// let prod = unsafe{ array.prod().block().expect("array len > 0")};
1486    /// assert_eq!((1..=array.len()).product::<usize>(),prod);
1487    ///```
1488    #[must_use = "this function is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "]
1489    pub unsafe fn prod(&self) -> AmHandle<Option<T>> {
1490        self.reduce("prod")
1491    }
1492
1493    #[doc(alias("One-sided", "onesided"))]
1494    /// Find the max element in the entire destributed array, returning to the calling PE
1495    ///
1496    /// This equivalent to `reduce("max")`.
1497    ///
1498    /// # Safety
1499    /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's or local threads may access this PE's local data.
1500    /// Any updates to local data are not guaranteed to be Atomic.
1501    ///
1502    /// # One-sided Operation
1503    /// The calling PE is responsible for launching `Max` active messages on the other PEs associated with the array.
1504    /// the returned max reduction result is only available on the calling PE
1505    /// # Note
1506    /// The future retuned by this function is lazy and does nothing unless awaited, [spawned][AmHandle::spawn] or [blocked on][AmHandle::block]
1507    /// # Examples
1508    /// ```
1509    /// use lamellar::array::prelude::*;
1510    /// let world = LamellarWorldBuilder::new().build();
1511    /// let num_pes = world.num_pes();
1512    /// let array = UnsafeArray::<usize>::new(&world,10,Distribution::Block).block();
1513    /// let array_clone = array.clone();
1514    /// let _ = unsafe{array.dist_iter_mut().enumerate().for_each(|(i,elem)| *elem = i*2).spawn()}; //safe as we are accessing in a data parallel fashion
1515    /// array.wait_all();
1516    /// array.barrier();
1517    /// let max_req = unsafe{array.max()}; //Safe in this instance as we have ensured no updates are currently happening
1518    /// let max = max_req.block().expect("array len > 0");
1519    /// assert_eq!((array.len()-1)*2,max);
1520    ///```
1521    #[must_use = "this function is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "]
1522    pub unsafe fn max(&self) -> AmHandle<Option<T>> {
1523        self.reduce("max")
1524    }
1525
1526    #[doc(alias("One-sided", "onesided"))]
1527    /// Find the min element in the entire destributed array, returning to the calling PE
1528    ///
1529    /// This equivalent to `reduce("min")`.
1530    ///
1531    /// # Safety
1532    /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's or local threads may access this PE's local data.
1533    /// Any updates to local data are not guaranteed to be Atomic.
1534    ///
1535    /// # One-sided Operation
1536    /// The calling PE is responsible for launching `Min` active messages on the other PEs associated with the array.
1537    /// the returned min reduction result is only available on the calling PE
1538    /// # Note
1539    /// The future retuned by this function is lazy and does nothing unless awaited, [spawned][AmHandle::spawn] or [blocked on][AmHandle::block]
1540    /// # Examples
1541    /// ```
1542    /// use lamellar::array::prelude::*;
1543    /// let world = LamellarWorldBuilder::new().build();
1544    /// let num_pes = world.num_pes();
1545    /// let array = UnsafeArray::<usize>::new(&world,10,Distribution::Block).block();
1546    /// let array_clone = array.clone();
1547    /// let _ = unsafe{array.dist_iter_mut().enumerate().for_each(|(i,elem)| *elem = i*2).spawn()}; //safe as we are accessing in a data parallel fashion
1548    /// array.wait_all();
1549    /// array.barrier();
1550    /// let min_req = unsafe{array.min()}; //Safe in this instance as we have ensured no updates are currently happening
1551    /// let min = min_req.block().expect("array len > 0");
1552    /// assert_eq!(0,min);
1553    ///```
1554    #[must_use = "this function is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "]
1555    pub unsafe fn min(&self) -> AmHandle<Option<T>> {
1556        self.reduce("min")
1557    }
1558}
1559
1560impl UnsafeArrayInnerWeak {
1561    pub(crate) fn upgrade(&self) -> Option<UnsafeArrayInner> {
1562        if let Some(data) = self.data.upgrade() {
1563            Some(UnsafeArrayInner {
1564                data: data,
1565                distribution: self.distribution.clone(),
1566                orig_elem_per_pe: self.orig_elem_per_pe,
1567                orig_remaining_elems: self.orig_remaining_elems,
1568                elem_size: self.elem_size,
1569                offset: self.offset,
1570                size: self.size,
1571                sub: self.sub,
1572            })
1573        } else {
1574            None
1575        }
1576    }
1577}
1578
1579impl UnsafeArrayInner {
1580    pub(crate) fn spawn<F: Future>(&self, f: F) -> LamellarTask<F::Output>
1581    where
1582        F: Future + Send + 'static,
1583        F::Output: Send,
1584    {
1585        self.data.team.scheduler.spawn_task(
1586            f,
1587            vec![
1588                self.data.team.world_counters.clone(),
1589                self.data.team.team_counters.clone(),
1590                self.data.array_counters.clone(),
1591            ],
1592        )
1593    }
1594    pub(crate) fn block_on<F: Future>(&self, f: F) -> F::Output {
1595        self.data.team.scheduler.block_on(f)
1596    }
1597    pub(crate) fn downgrade(array: &UnsafeArrayInner) -> UnsafeArrayInnerWeak {
1598        UnsafeArrayInnerWeak {
1599            data: Darc::downgrade(&array.data),
1600            distribution: array.distribution.clone(),
1601            orig_elem_per_pe: array.orig_elem_per_pe,
1602            orig_remaining_elems: array.orig_remaining_elems,
1603            elem_size: array.elem_size,
1604            offset: array.offset,
1605            size: array.size,
1606            sub: array.sub,
1607        }
1608    }
1609
1610    pub(crate) fn full_pe_and_offset_for_global_index(
1611        &self,
1612        index: usize,
1613    ) -> Option<(usize, usize)> {
1614        if self.size > index {
1615            let global_index = index;
1616            match self.distribution {
1617                Distribution::Block => {
1618                    let rem_index = self.orig_remaining_elems * (self.orig_elem_per_pe + 1);
1619
1620                    let (pe, offset) = if global_index < rem_index {
1621                        //index is on a pe with extra elems
1622                        let pe = global_index / (self.orig_elem_per_pe + 1); // accounts for the reamining elems
1623                        let offset = global_index - (pe * (self.orig_elem_per_pe + 1));
1624                        (pe, offset)
1625                    } else {
1626                        //index is on a pe without extra elems
1627                        let temp_index = global_index - rem_index; //get the remainin index after accounter for PEs with extra elements
1628                        let temp_pe = temp_index / self.orig_elem_per_pe; //the pe after accounting for PEs with extra elements
1629                        let pe = self.orig_remaining_elems  // N pes that have extra elements
1630                            + temp_pe;
1631                        let offset = temp_index - (temp_pe * self.orig_elem_per_pe);
1632                        (pe, offset)
1633                    };
1634                    Some((pe, offset))
1635                }
1636                Distribution::Cyclic => {
1637                    let res = Some((
1638                        global_index % self.data.num_pes,
1639                        global_index / self.data.num_pes,
1640                    ));
1641                    res
1642                }
1643            }
1644        } else {
1645            None
1646        }
1647    }
1648
1649    //index is relative to (sub)array (i.e. index=0 doesnt necessarily live on pe=0)
1650    // //#[tracing::instrument(skip_all)]
1651    pub(crate) fn pe_for_dist_index(&self, index: usize) -> Option<usize> {
1652        // println!("pe_for_dist_index {index} {}", self.size);
1653        if self.size > index {
1654            let global_index = index + self.offset;
1655
1656            match self.distribution {
1657                Distribution::Block => {
1658                    let rem_index = self.orig_remaining_elems * (self.orig_elem_per_pe + 1);
1659                    let pe = if global_index < rem_index {
1660                        global_index / (self.orig_elem_per_pe + 1) // accounts for the reamining elems
1661                    } else {
1662                        self.orig_remaining_elems  // N pes that have extra elements
1663                            + ((global_index - rem_index) //get the remainin index after accounter for PEs with extra elements
1664                            / self.orig_elem_per_pe)
1665                    };
1666                    Some(pe)
1667                }
1668                Distribution::Cyclic => Some(global_index % self.data.num_pes),
1669            }
1670        } else {
1671            None
1672        }
1673    }
1674
1675    //index relative to subarray, return offset relative to subarray
1676    // //#[tracing::instrument(skip_all)]
1677    pub(crate) fn pe_full_offset_for_dist_index(&self, pe: usize, index: usize) -> Option<usize> {
1678        // println!("pe_full_offset_for_dist_index pe {pe} index {index}");
1679        let global_index = self.offset + index;
1680        match self.distribution {
1681            Distribution::Block => {
1682                let rem_index = self.orig_remaining_elems * (self.orig_elem_per_pe + 1);
1683                // println!("\tindex: {index} offset {} size {} global_index {global_index} rem_index {rem_index}",self.offset, self.size);
1684                let offset = if global_index < rem_index {
1685                    //index is on a pe with extra elems
1686                    global_index - (pe * (self.orig_elem_per_pe + 1))
1687                } else {
1688                    //index is on a pe without extra elems
1689                    let temp_index = global_index - rem_index; //get the remainin index after accounter for PEs with extra elements
1690                    let temp_pe = temp_index / self.orig_elem_per_pe; //the pe after accounting for PEs with extra elements
1691
1692                    temp_index - (temp_pe * self.orig_elem_per_pe)
1693                };
1694                Some(offset)
1695            }
1696            Distribution::Cyclic => {
1697                let num_pes = self.data.num_pes;
1698                if global_index % num_pes == pe {
1699                    Some(index / num_pes)
1700                } else {
1701                    None
1702                }
1703            }
1704        }
1705    }
1706
1707    //index relative to subarray, return offset relative to subarray
1708    pub(crate) fn pe_sub_offset_for_dist_index(&self, pe: usize, index: usize) -> Option<usize> {
1709        // println!(
1710        //     "pe_sub_offset_for_dist_index index {index} pe {pe} offset {}",
1711        //     self.offset
1712        // );
1713        let start_pe = self.pe_for_dist_index(0)?;
1714
1715        match self.distribution {
1716            Distribution::Block => {
1717                if start_pe == pe {
1718                    if index < self.size {
1719                        Some(index)
1720                    } else {
1721                        None
1722                    }
1723                } else {
1724                    self.pe_full_offset_for_dist_index(pe, index)
1725                }
1726            }
1727            Distribution::Cyclic => {
1728                let num_pes = self.data.num_pes;
1729                if (index + self.offset) % num_pes == pe {
1730                    Some(index / num_pes)
1731                } else {
1732                    None
1733                }
1734            }
1735        }
1736    }
1737
1738    //index is local with respect to subarray
1739    //returns local offset relative to full array
1740    // //#[tracing::instrument(skip_all)]
1741    pub(crate) fn pe_full_offset_for_local_index(&self, pe: usize, index: usize) -> Option<usize> {
1742        let global_index = self.global_index_from_local(index)?;
1743        match self.distribution {
1744            Distribution::Block => {
1745                let pe_start_index = self.global_start_index_for_pe(pe);
1746                let mut pe_end_index = pe_start_index + self.orig_elem_per_pe;
1747                if pe < self.orig_remaining_elems {
1748                    pe_end_index += 1;
1749                }
1750                if pe_start_index <= global_index && global_index < pe_end_index {
1751                    Some(global_index - pe_start_index)
1752                } else {
1753                    None
1754                }
1755            }
1756            Distribution::Cyclic => {
1757                let num_pes = self.data.num_pes;
1758                if global_index % num_pes == pe {
1759                    Some(global_index / num_pes)
1760                } else {
1761                    None
1762                }
1763            }
1764        }
1765    }
1766
1767    //index is local with respect to subarray
1768    //returns index with respect to original full length array
1769    // //#[tracing::instrument(skip_all)]
1770    pub(crate) fn global_index_from_local(&self, index: usize) -> Option<usize> {
1771        let my_pe = self.data.my_pe;
1772        match self.distribution {
1773            Distribution::Block => {
1774                let global_start = self.global_start_index_for_pe(my_pe);
1775                let start = global_start as isize - self.offset as isize;
1776                if start >= 0 {
1777                    //the (sub)array starts before my pe
1778                    if (start as usize) < self.size {
1779                        //sub(array) exists on my node
1780                        Some(global_start as usize + index)
1781                    } else {
1782                        //sub array does not exist on my node
1783                        None
1784                    }
1785                } else {
1786                    //inner starts on or after my pe
1787                    let mut global_end = global_start + self.orig_elem_per_pe;
1788                    if my_pe < self.orig_remaining_elems {
1789                        global_end += 1;
1790                    }
1791                    if self.offset < global_end {
1792                        //the (sub)array starts on my pe
1793                        Some(self.offset + index)
1794                    } else {
1795                        //the (sub)array starts after my pe
1796                        None
1797                    }
1798                }
1799            }
1800            Distribution::Cyclic => {
1801                let num_pes = self.data.num_pes;
1802                let start_pe = match self.pe_for_dist_index(0) {
1803                    Some(i) => i,
1804                    None => panic!("index 0 out of bounds for array of length {:?}", self.size),
1805                };
1806                let end_pe = match self.pe_for_dist_index(self.size - 1) {
1807                    Some(i) => i,
1808                    None => panic!(
1809                        "index {:?} out of bounds for array of length {:?}",
1810                        self.size - 1,
1811                        self.size
1812                    ),
1813                };
1814
1815                let mut num_elems = self.size / num_pes;
1816                // println!("{:?} {:?} {:?} {:?}",num_pes,start_pe,end_pe,num_elems);
1817
1818                if self.size % num_pes != 0 {
1819                    //we have leftover elements
1820                    if start_pe <= end_pe {
1821                        //no wrap around occurs
1822                        if start_pe <= my_pe && my_pe <= end_pe {
1823                            num_elems += 1;
1824                        }
1825                    } else {
1826                        //wrap around occurs
1827                        if start_pe <= my_pe || my_pe <= end_pe {
1828                            num_elems += 1;
1829                        }
1830                    }
1831                }
1832                // println!("{:?} {:?} {:?} {:?}",num_pes,start_pe,end_pe,num_elems);
1833
1834                if index < num_elems {
1835                    if start_pe <= my_pe {
1836                        Some(num_pes * index + self.offset + (my_pe - start_pe))
1837                    } else {
1838                        Some(num_pes * index + self.offset + (num_pes - start_pe) + my_pe)
1839                    }
1840                } else {
1841                    None
1842                }
1843            }
1844        }
1845    }
1846
1847    //index is local with respect to subarray
1848    //returns index with respect to subarrayy
1849    // //#[tracing::instrument(skip_all)]
1850    pub(crate) fn subarray_index_from_local(&self, index: usize) -> Option<usize> {
1851        let my_pe = self.data.my_pe;
1852        let my_start_index = self.start_index_for_pe(my_pe)?; //None means subarray doesnt exist on this PE
1853        match self.distribution {
1854            Distribution::Block => {
1855                if my_start_index + index < self.size {
1856                    //local index is in subarray
1857                    Some(my_start_index + index)
1858                } else {
1859                    //local index outside subarray
1860                    None
1861                }
1862            }
1863            Distribution::Cyclic => {
1864                let num_pes = self.data.num_pes;
1865                let num_elems_local = self.num_elems_local();
1866                if index < num_elems_local {
1867                    //local index is in subarray
1868                    Some(my_start_index + num_pes * index)
1869                } else {
1870                    //local index outside subarray
1871                    None
1872                }
1873            }
1874        }
1875    }
1876
1877    // return index relative to the full array
1878    pub(crate) fn global_start_index_for_pe(&self, pe: usize) -> usize {
1879        match self.distribution {
1880            Distribution::Block => {
1881                let global_start = self.orig_elem_per_pe * pe;
1882                global_start + std::cmp::min(pe, self.orig_remaining_elems)
1883            }
1884            Distribution::Cyclic => pe,
1885        }
1886    }
1887    //return index relative to the subarray
1888    // //#[tracing::instrument(skip_all)]
1889    pub(crate) fn start_index_for_pe(&self, pe: usize) -> Option<usize> {
1890        match self.distribution {
1891            Distribution::Block => {
1892                let global_start = self.global_start_index_for_pe(pe);
1893                let start = global_start as isize - self.offset as isize;
1894                if start >= 0 {
1895                    //the (sub)array starts before my pe
1896                    if (start as usize) < self.size {
1897                        //sub(array) exists on my node
1898                        Some(start as usize)
1899                    } else {
1900                        //sub array does not exist on my node
1901                        None
1902                    }
1903                } else {
1904                    let mut global_end = global_start + self.orig_elem_per_pe;
1905                    if pe < self.orig_remaining_elems {
1906                        global_end += 1;
1907                    }
1908                    if self.offset < global_end {
1909                        //the (sub)array starts on my pe
1910                        Some(0)
1911                    } else {
1912                        //the (sub)array starts after my pe
1913                        None
1914                    }
1915                }
1916            }
1917            Distribution::Cyclic => {
1918                let num_pes = self.data.num_pes;
1919                if let Some(start_pe) = self.pe_for_dist_index(0) {
1920                    let temp_len = if self.size < num_pes {
1921                        //sub array might not exist on my array
1922                        self.size
1923                    } else {
1924                        num_pes
1925                    };
1926                    for i in 0..temp_len {
1927                        if (i + start_pe) % num_pes == pe {
1928                            return Some(i);
1929                        }
1930                    }
1931                }
1932                None
1933            }
1934        }
1935    }
1936
1937    #[allow(dead_code)]
1938    pub(crate) fn global_end_index_for_pe(&self, pe: usize) -> usize {
1939        self.global_start_index_for_pe(pe) + self.num_elems_pe(pe)
1940    }
1941
1942    //return index relative to the subarray
1943    // //#[tracing::instrument(skip_all)]
1944    pub(crate) fn end_index_for_pe(&self, pe: usize) -> Option<usize> {
1945        let start_i = self.start_index_for_pe(pe)?;
1946        match self.distribution {
1947            Distribution::Block => {
1948                //(sub)array ends on our pe
1949                if pe == self.pe_for_dist_index(self.size - 1)? {
1950                    Some(self.size - 1)
1951                } else {
1952                    // (sub)array ends on another pe
1953                    Some(self.start_index_for_pe(pe + 1)? - 1)
1954                }
1955            }
1956            Distribution::Cyclic => {
1957                let num_elems = self.num_elems_pe(pe);
1958                let num_pes = self.data.num_pes;
1959                let end_i = start_i + (num_elems - 1) * num_pes;
1960                Some(end_i)
1961            }
1962        }
1963    }
1964
1965    // //#[tracing::instrument(skip_all)]
1966    pub(crate) fn num_elems_pe(&self, pe: usize) -> usize {
1967        match self.distribution {
1968            Distribution::Block => {
1969                if let Some(start_i) = self.start_index_for_pe(pe) {
1970                    //inner starts before or on pe
1971                    let end_i = if let Some(end_i) = self.start_index_for_pe(pe + 1) {
1972                        //inner ends after pe
1973                        end_i
1974                    } else {
1975                        //inner ends on pe
1976                        self.size
1977                    };
1978                    // println!("num_elems_pe pe {:?} si {:?} ei {:?}",pe,start_i,end_i);
1979                    end_i - start_i
1980                } else {
1981                    0
1982                }
1983            }
1984            Distribution::Cyclic => {
1985                let num_pes = self.data.num_pes;
1986                if let Some(start_pe) = self.pe_for_dist_index(0) {
1987                    let end_pe = match self.pe_for_dist_index(self.size - 1) {
1988                        Some(i) => i,
1989                        None => panic!(
1990                            "index {:?} out of bounds for array of length {:?}",
1991                            self.size - 1,
1992                            self.size
1993                        ),
1994                    }; //inclusive
1995                    let mut num_elems = self.size / num_pes;
1996                    if self.size % num_pes != 0 {
1997                        //we have left over elements
1998                        if start_pe <= end_pe {
1999                            //no wrap around occurs
2000                            if pe >= start_pe && pe <= end_pe {
2001                                num_elems += 1
2002                            }
2003                        } else {
2004                            //wrap arround occurs
2005                            if pe >= start_pe || pe <= end_pe {
2006                                num_elems += 1
2007                            }
2008                        }
2009                    }
2010                    num_elems
2011                } else {
2012                    0
2013                }
2014            }
2015        }
2016    }
2017    // //#[tracing::instrument(skip_all)]
2018    pub(crate) fn num_elems_local(&self) -> usize {
2019        self.num_elems_pe(self.data.my_pe)
2020    }
2021
2022    // //#[tracing::instrument(skip_all)]
2023    pub(crate) unsafe fn local_as_mut_slice(&self) -> &mut [u8] {
2024        let slice =
2025            self.data.mem_region.as_casted_mut_slice::<u8>().expect(
2026                "memory doesnt exist on this pe (this should not happen for arrays currently)",
2027            );
2028        // let len = self.size;
2029        let my_pe = self.data.my_pe;
2030        let num_pes = self.data.num_pes;
2031        let num_elems_local = self.num_elems_local();
2032        match self.distribution {
2033            Distribution::Block => {
2034                let start_pe = self
2035                    .pe_for_dist_index(0)
2036                    .expect("array len should be greater than 0"); //index is relative to inner
2037                                                                   // let end_pe = self.pe_for_dist_index(len-1).unwrap();
2038                                                                   // println!("spe {:?} epe {:?}",start_pe,end_pe);
2039                let start_index = if my_pe == start_pe {
2040                    //inner starts on my pe
2041                    let global_start = self.global_start_index_for_pe(my_pe);
2042                    self.offset - global_start
2043                } else {
2044                    0
2045                };
2046                let end_index = start_index + num_elems_local;
2047                // println!(
2048                //     "nel {:?} sao {:?} as slice si: {:?} ei {:?} elemsize {:?}",
2049                //     num_elems_local, self.offset, start_index, end_index, self.elem_size
2050                // );
2051                &mut slice[start_index * self.elem_size..end_index * self.elem_size]
2052            }
2053            Distribution::Cyclic => {
2054                let global_index = self.offset;
2055                let start_index = global_index / num_pes
2056                    + if my_pe >= global_index % num_pes {
2057                        0
2058                    } else {
2059                        1
2060                    };
2061                let end_index = start_index + num_elems_local;
2062                // println!("si {:?}  ei {:?}",start_index,end_index);
2063                &mut slice[start_index * self.elem_size..end_index * self.elem_size]
2064            }
2065        }
2066    }
2067
2068    // //#[tracing::instrument(skip_all)]
2069    pub(crate) unsafe fn local_as_mut_ptr(&self) -> *mut u8 {
2070        let ptr =
2071            self.data.mem_region.as_casted_mut_ptr::<u8>().expect(
2072                "memory doesnt exist on this pe (this should not happen for arrays currently)",
2073            );
2074        // println!("u8 ptr: {:?}", ptr);
2075        // let len = self.size;
2076        let my_pe = self.data.my_pe;
2077        let num_pes = self.data.num_pes;
2078        // let num_elems_local = self.num_elems_local();
2079        match self.distribution {
2080            Distribution::Block => {
2081                let start_pe = self
2082                    .pe_for_dist_index(0)
2083                    .expect("array len should be greater than 0"); //index is relative to inner
2084                                                                   // let end_pe = self.pe_for_dist_index(len-1).unwrap();
2085                                                                   // println!("spe {:?} epe {:?}",start_pe,end_pe);
2086                let start_index = if my_pe == start_pe {
2087                    //inner starts on my pe
2088                    let global_start = self.global_start_index_for_pe(my_pe);
2089                    self.offset - global_start
2090                } else {
2091                    0
2092                };
2093
2094                // println!("nel {:?} sao {:?} as slice si: {:?} ei {:?}",num_elems_local,self.offset,start_index,end_index);
2095                ptr.offset((start_index * self.elem_size) as isize)
2096            }
2097            Distribution::Cyclic => {
2098                let global_index = self.offset;
2099                let start_index = global_index / num_pes
2100                    + if my_pe >= global_index % num_pes {
2101                        0
2102                    } else {
2103                        1
2104                    };
2105                // println!("si {:?}  ei {:?}",start_index,end_index);
2106                ptr.offset((start_index * self.elem_size) as isize)
2107            }
2108        }
2109    }
2110
2111    pub(crate) fn barrier_handle(&self) -> BarrierHandle {
2112        self.data.team.barrier.barrier_handle()
2113    }
2114}
2115
2116#[cfg(test)]
2117mod tests {
2118    #[test]
2119    fn pe_for_dist_index() {
2120        for num_pes in 2..200 {
2121            println!("num_pes {:?}", num_pes);
2122            for len in num_pes..2000 {
2123                let mut elems_per_pe = vec![0; num_pes];
2124                let mut pe_for_elem = vec![0; len];
2125                let epp = len as f32 / num_pes as f32;
2126                let mut cur_elem = 0;
2127                for pe in 0..num_pes {
2128                    elems_per_pe[pe] =
2129                        (((pe + 1) as f32 * epp).round() - (pe as f32 * epp).round()) as usize;
2130                    for _i in 0..elems_per_pe[pe] {
2131                        pe_for_elem[cur_elem] = pe;
2132                        cur_elem += 1;
2133                    }
2134                }
2135                for elem in 0..len {
2136                    //the actual calculation
2137                    let mut calc_pe = (((elem) as f32 / epp).floor()) as usize;
2138                    let end_i = (epp * (calc_pe + 1) as f32).round() as usize;
2139                    if elem >= end_i {
2140                        calc_pe += 1;
2141                    }
2142                    //--------------------
2143                    if calc_pe != pe_for_elem[elem] {
2144                        println!(
2145                            "npe: {:?} len {:?} e: {:?} eep: {:?} cpe: {:?}  ei {:?}",
2146                            num_pes,
2147                            len,
2148                            elem,
2149                            epp,
2150                            ((elem) as f32 / epp),
2151                            end_i
2152                        );
2153                        println!("{:?}", elems_per_pe);
2154                        println!("{:?}", pe_for_elem);
2155                    }
2156                    assert_eq!(calc_pe, pe_for_elem[elem]);
2157                }
2158            }
2159        }
2160    }
2161}