lamellar/array/unsafe.rs
1//! This module provides an unsafe abstraction of a distributed array in Lamellar.
2
3pub(crate) mod handle;
4pub use handle::*;
5mod iteration;
6pub(crate) mod local_chunks;
7pub use local_chunks::*;
8pub(crate) mod operations;
9pub use operations::*;
10mod rdma;
11
12
13use crate::active_messaging::ActiveMessaging;
14use crate::active_messaging::*;
15// use crate::array::r#unsafe::operations::BUFOPS;
16use crate::array::private::{ArrayExecAm, LamellarArrayPrivate};
17use crate::array::*;
18use crate::array::{LamellarRead, LamellarWrite};
19use crate::barrier::BarrierHandle;
20use crate::darc::{Darc, DarcMode, WeakDarc};
21use crate::env_var::config;
22use crate::lamellae::AllocationType;
23use crate::lamellar_team::{IntoLamellarTeam, LamellarTeamRT};
24use crate::memregion::{Dist, MemoryRegion};
25use crate::scheduler::LamellarTask;
26use crate::warnings::RuntimeWarning;
27use crate::LamellarTaskGroup;
28
29use core::marker::PhantomData;
30use futures_util::{future, StreamExt};
31use std::ops::Bound;
32use std::pin::Pin;
33use std::sync::atomic::{AtomicUsize, Ordering};
34use std::sync::Arc;
35use std::time::Instant;
36
37pub(crate) struct UnsafeArrayData {
38 mem_region: MemoryRegion<u8>,
39 pub(crate) array_counters: Arc<AMCounters>,
40 pub(crate) team: Pin<Arc<LamellarTeamRT>>,
41 pub(crate) task_group: Arc<LamellarTaskGroup>,
42 pub(crate) my_pe: usize,
43 pub(crate) num_pes: usize,
44 req_cnt: Arc<AtomicUsize>,
45}
46
47impl std::fmt::Debug for UnsafeArrayData {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 write!(
50 f,
51 "UnsafeArrayData{{ mem_region: {:?}, array_counters: {:?}, team: {:?}, task_group: {:?}, my_pe: {:?}, num_pes: {:?} req_cnt: {:?} }}",
52 self.mem_region, self.array_counters, self.team, self.task_group, self.my_pe, self.num_pes, self.req_cnt
53 )
54 }
55}
56
57/// An unsafe abstraction of a distributed array.
58///
59/// This array type provides no gaurantees on how it's data is being accessed, either locally or from remote PEs.
60///
61/// UnsafeArrays are really intended for use in the runtime as the foundation for our safe array types.
62///
63/// # Warning
64/// Unless you are very confident in low level distributed memory access it is highly recommended you utilize the
65/// the other LamellarArray types ([AtomicArray], [LocalLockArray], [GlobalLockArray], [ReadOnlyArray]) to construct and interact with distributed memory.
66#[lamellar_impl::AmDataRT(Clone, Debug)]
67pub struct UnsafeArray<T> {
68 pub(crate) inner: UnsafeArrayInner,
69 phantom: PhantomData<T>,
70}
71
72#[doc(hidden)]
73#[lamellar_impl::AmDataRT(Clone, Debug)]
74pub struct UnsafeByteArray {
75 pub(crate) inner: UnsafeArrayInner,
76}
77
78impl UnsafeByteArray {
79 pub(crate) fn downgrade(array: &UnsafeByteArray) -> UnsafeByteArrayWeak {
80 UnsafeByteArrayWeak {
81 inner: UnsafeArrayInner::downgrade(&array.inner),
82 }
83 }
84}
85
86#[doc(hidden)]
87#[lamellar_impl::AmLocalDataRT(Clone, Debug)]
88pub struct UnsafeByteArrayWeak {
89 pub(crate) inner: UnsafeArrayInnerWeak,
90}
91
92impl UnsafeByteArrayWeak {
93 pub fn upgrade(&self) -> Option<UnsafeByteArray> {
94 if let Some(inner) = self.inner.upgrade() {
95 Some(UnsafeByteArray { inner })
96 } else {
97 None
98 }
99 }
100}
101
102pub(crate) mod private {
103 use super::UnsafeArrayData;
104 use crate::array::Distribution;
105 use crate::darc::Darc;
106 #[lamellar_impl::AmDataRT(Clone, Debug)]
107 pub struct UnsafeArrayInner {
108 pub(crate) data: Darc<UnsafeArrayData>,
109 pub(crate) distribution: Distribution,
110 pub(crate) orig_elem_per_pe: usize,
111 pub(crate) orig_remaining_elems: usize,
112 pub(crate) elem_size: usize, //for bytes array will be size of T, for T array will be 1
113 pub(crate) offset: usize, //relative to size of T
114 pub(crate) size: usize, //relative to size of T
115 pub(crate) sub: bool,
116 }
117}
118use private::UnsafeArrayInner;
119
120#[lamellar_impl::AmLocalDataRT(Clone, Debug)]
121pub(crate) struct UnsafeArrayInnerWeak {
122 pub(crate) data: WeakDarc<UnsafeArrayData>,
123 pub(crate) distribution: Distribution,
124 orig_elem_per_pe: usize,
125 orig_remaining_elems: usize, // the number of elements that can't be evenly divided amongst all PES
126 elem_size: usize, //for bytes array will be size of T, for T array will be 1
127 offset: usize, //relative to size of T
128 size: usize, //relative to size of T
129 sub: bool,
130}
131
132// impl Drop for UnsafeArrayInner {
133// fn drop(&mut self) {
134// // println!("unsafe array inner dropping");
135// }
136// }
137
138// impl<T: Dist> Drop for UnsafeArray<T> {
139// fn drop(&mut self) {
140// println!("Dropping unsafe array");
141// // self.wait_all();
142// }
143// }
144
145impl<T: Dist + ArrayOps + 'static> UnsafeArray<T> {
146 #[doc(alias = "Collective")]
147 /// Construct a new UnsafeArray with a length of `array_size` whose data will be layed out with the provided `distribution` on the PE's specified by the `team`.
148 /// `team` is commonly a [LamellarWorld][crate::LamellarWorld] or [LamellarTeam] (instance or reference).
149 ///
150 /// # Collective Operation
151 /// Requires all PEs associated with the `team` to enter the constructor call otherwise deadlock will occur (i.e. team barriers are being called internally)
152 ///
153 /// # Examples
154 ///```
155 /// use lamellar::array::prelude::*;
156 ///
157 /// let world = LamellarWorldBuilder::new().build();
158 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
159 pub fn new<U: Into<IntoLamellarTeam>>(
160 team: U,
161 array_size: usize,
162 distribution: Distribution,
163 ) -> UnsafeArrayHandle<T> {
164 // let temp_team = team.into();
165 let team = team.into().team.clone();
166 UnsafeArrayHandle {
167 team: team.clone(),
168 launched: false,
169 creation_future: Box::pin(UnsafeArray::async_new(
170 team,
171 array_size,
172 distribution,
173 DarcMode::UnsafeArray,
174 )),
175 }
176 }
177
178 pub(crate) async fn async_new<U: Into<IntoLamellarTeam>>(
179 team: U,
180 array_size: usize,
181 distribution: Distribution,
182 darc_mode: DarcMode,
183 ) -> UnsafeArray<T> {
184 let team = team.into().team.clone();
185 team.async_barrier().await;
186 let task_group = LamellarTaskGroup::new(team.clone());
187 let my_pe = team.team_pe_id().unwrap();
188 let num_pes = team.num_pes();
189 let full_array_size = std::cmp::max(array_size, num_pes);
190
191 let elem_per_pe = full_array_size / num_pes;
192 let remaining_elems = full_array_size % num_pes;
193 let mut per_pe_size = elem_per_pe;
194 if remaining_elems > 0 {
195 per_pe_size += 1
196 }
197 let rmr_t: MemoryRegion<T> = if team.num_world_pes == team.num_pes {
198 MemoryRegion::new(per_pe_size, team.lamellae.clone(), AllocationType::Global)
199 } else {
200 MemoryRegion::new(
201 per_pe_size,
202 team.lamellae.clone(),
203 AllocationType::Sub(team.get_pes()),
204 )
205 };
206
207 unsafe {
208 // for elem in rmr_t.as_mut_slice().expect("data should exist on pe") {
209 // *elem = std::mem::zeroed();
210 // }
211 if std::mem::needs_drop::<T>() {
212 // If `T` needs to be dropped then we have to do this one item at a time, in
213 // case one of the intermediate drops does a panic.
214 // slice.iter_mut().for_each(write_zeroes);
215 panic!("Lamellar Arrays do not yet support elements that impl Drop");
216 } else {
217 // Otherwise we can be really fast and just fill everthing with zeros.
218 let len = std::mem::size_of_val::<[T]>(
219 rmr_t.as_mut_slice().expect("data should exist on pe"),
220 );
221 std::ptr::write_bytes(
222 rmr_t.as_mut_ptr().expect("data should exist on pe") as *mut u8,
223 0u8,
224 len,
225 )
226 }
227 }
228 let rmr = unsafe { rmr_t.to_base::<u8>() };
229
230 let data = Darc::async_try_new_with_drop(
231 team.clone(),
232 UnsafeArrayData {
233 mem_region: rmr,
234 array_counters: Arc::new(AMCounters::new()),
235 team: team.clone(),
236 task_group: Arc::new(task_group),
237 my_pe: my_pe,
238 num_pes: num_pes,
239 req_cnt: Arc::new(AtomicUsize::new(0)),
240 },
241 darc_mode,
242 None,
243 )
244 .await
245 .expect("trying to create array on non team member");
246 let array = UnsafeArray {
247 inner: UnsafeArrayInner {
248 data: data,
249 distribution: distribution.clone(),
250 orig_elem_per_pe: elem_per_pe,
251 orig_remaining_elems: remaining_elems,
252 elem_size: std::mem::size_of::<T>(),
253 offset: 0, //relative to size of T
254 size: full_array_size, //relative to size of T
255 sub: false,
256 },
257 phantom: PhantomData,
258 };
259 // println!("new unsafe");
260 // unsafe {println!("size {:?} bytes {:?}",array.inner.size, array.inner.data.mem_region.as_mut_slice().unwrap().len())};
261 // println!("elem per pe {:?}", elem_per_pe);
262 // for i in 0..num_pes{
263 // println!("pe: {:?} {:?}",i,array.inner.num_elems_pe(i));
264 // }
265 // array.inner.data.print();
266 if full_array_size != array_size {
267 println!("WARNING: Array size {array_size} is less than number of pes {full_array_size}, each PE will not contain data");
268 array.sub_array(0..array_size)
269 } else {
270 array
271 }
272 // println!("after buffered ops");
273 // array.inner.data.print();
274 }
275}
276impl<T: Dist + 'static> UnsafeArray<T> {
277 #[doc(alias("One-sided", "onesided"))]
278 /// Change the distribution this array handle uses to index into the data of the array.
279 ///
280 /// # One-sided Operation
281 /// This is a one-sided call and does not redistribute the modify actual data, it simply changes how the array is indexed for this particular handle.
282 ///
283 /// # Examples
284 ///```
285 /// use lamellar::array::prelude::*;
286 /// let world = LamellarWorldBuilder::new().build();
287 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
288 /// // do something interesting... or not
289 /// let block_view = array.clone().use_distribution(Distribution::Block);
290 ///```
291 pub fn use_distribution(mut self, distribution: Distribution) -> Self {
292 self.inner.distribution = distribution;
293 self
294 }
295
296 #[doc(alias("One-sided", "onesided"))]
297 /// Return the calling PE's local data as an immutable slice
298 ///
299 /// # Safety
300 /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's may access this PE's local data.
301 /// It is also possible to have mutable and immutable references to this arrays data on the same PE
302 ///
303 /// # One-sided Operation
304 /// Only returns local data on the calling PE
305 ///
306 /// # Examples
307 ///```
308 /// use lamellar::array::prelude::*;
309 /// let world = LamellarWorldBuilder::new().build();
310 /// let my_pe = world.my_pe();
311 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
312 ///
313 /// unsafe {
314 /// let slice = array.local_as_slice();
315 /// println!("PE{my_pe} data: {slice:?}");
316 /// }
317 ///```
318 pub unsafe fn local_as_slice(&self) -> &[T] {
319 self.local_as_mut_slice()
320 }
321
322 #[doc(alias("One-sided", "onesided"))]
323 /// Return the calling PE's local data as a mutable slice
324 ///
325 /// # Safety
326 /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's may access this PE's local data.
327 /// It is also possible to have mutable and immutable references to this arrays data on the same PE
328 ///
329 /// # One-sided Operation
330 /// Only returns local data on the calling PE
331 ///
332 /// # Examples
333 ///```
334 /// use lamellar::array::prelude::*;
335 /// let world = LamellarWorldBuilder::new().build();
336 /// let my_pe = world.my_pe();
337 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
338 ///
339 /// unsafe {
340 /// let slice = array.local_as_mut_slice();
341 /// for elem in slice{
342 /// *elem += 1;
343 /// }
344 /// }
345 ///```
346 pub unsafe fn local_as_mut_slice(&self) -> &mut [T] {
347 let u8_slice = self.inner.local_as_mut_slice();
348 // println!("u8 slice {:?} u8_len {:?} len {:?}",u8_slice,u8_slice.len(),u8_slice.len()/std::mem::size_of::<T>());
349 std::slice::from_raw_parts_mut(
350 u8_slice.as_mut_ptr() as *mut T,
351 u8_slice.len() / std::mem::size_of::<T>(),
352 )
353 }
354
355 #[doc(alias("One-sided", "onesided"))]
356 /// Return the calling PE's local data as an immutable slice
357 ///
358 /// # Safety
359 /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's may access this PE's local data.
360 /// It is also possible to have mutable and immutable references to this arrays data on the same PE
361 ///
362 /// # One-sided Operation
363 /// Only returns local data on the calling PE
364 ///
365 /// # Examples
366 ///```
367 /// use lamellar::array::prelude::*;
368 /// let world = LamellarWorldBuilder::new().build();
369 /// let my_pe = world.my_pe();
370 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
371 /// unsafe {
372 /// let slice = array.local_data();
373 /// println!("PE{my_pe} data: {slice:?}");
374 /// }
375 ///```
376 pub unsafe fn local_data(&self) -> &[T] {
377 self.local_as_mut_slice()
378 }
379
380 #[doc(alias("One-sided", "onesided"))]
381 /// Return the calling PE's local data as a mutable slice
382 ///
383 /// # Safety
384 /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's may access this PE's local data.
385 /// It is also possible to have mutable and immutable references to this arrays data on the same PE
386 ///
387 /// # One-sided Operation
388 /// Only returns local data on the calling PE
389 ///
390 /// # Examples
391 ///```
392 /// use lamellar::array::prelude::*;
393 /// let world = LamellarWorldBuilder::new().build();
394 /// let my_pe = world.my_pe();
395 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
396 ///
397 /// unsafe {
398 /// let slice = array.mut_local_data();
399 /// for elem in slice{
400 /// *elem += 1;
401 /// }
402 /// }
403 ///```
404 pub unsafe fn mut_local_data(&self) -> &mut [T] {
405 self.local_as_mut_slice()
406 }
407
408 pub(crate) fn local_as_mut_ptr(&self) -> *mut T {
409 let u8_ptr = unsafe { self.inner.local_as_mut_ptr() };
410 // self.inner.data.mem_region.as_casted_mut_ptr::<T>().unwrap();
411 // println!("ptr: {:?} {:?}", u8_ptr, u8_ptr as *const T);
412 u8_ptr as *mut T
413 }
414
415 /// Return the index range with respect to the original array over which this array handle represents)
416 ///
417 /// # Examples
418 ///```
419 /// use lamellar::array::prelude::*;
420 /// let world = LamellarWorldBuilder::new().build();
421 /// let my_pe = world.my_pe();
422 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
423 ///
424 /// assert_eq!(array.sub_array_range(),(0..100));
425 ///
426 /// let sub_array = array.sub_array(25..75);
427 /// assert_eq!(sub_array.sub_array_range(),(25..75));
428 ///```
429 pub fn sub_array_range(&self) -> std::ops::Range<usize> {
430 self.inner.offset..(self.inner.offset + self.inner.size)
431 }
432
433 pub(crate) fn team_rt(&self) -> Pin<Arc<LamellarTeamRT>> {
434 self.inner.data.team.clone()
435 }
436
437 pub(crate) async fn await_all(&self) {
438 let am_counters = self.inner.data.array_counters.clone();
439
440 let mut temp_now = Instant::now();
441 let mut orig_reqs = am_counters.send_req_cnt.load(Ordering::SeqCst);
442 let mut orig_launched = am_counters.launched_req_cnt.load(Ordering::SeqCst);
443 let mut done = false;
444 while !done {
445 while self.team().panic.load(Ordering::SeqCst) == 0
446 && ((am_counters.outstanding_reqs.load(Ordering::SeqCst) > 0)
447 || orig_reqs != am_counters.send_req_cnt.load(Ordering::SeqCst)
448 || orig_launched != am_counters.launched_req_cnt.load(Ordering::SeqCst))
449 {
450 orig_reqs = am_counters.send_req_cnt.load(Ordering::SeqCst);
451 orig_launched = am_counters.launched_req_cnt.load(Ordering::SeqCst);
452 async_std::task::yield_now().await;
453 if temp_now.elapsed().as_secs_f64() > config().deadlock_timeout {
454 println!(
455 "in darc await_all mype: {:?} cnt: {:?} {:?}",
456 self.team_rt().world_pe,
457 am_counters.send_req_cnt.load(Ordering::SeqCst),
458 am_counters.outstanding_reqs.load(Ordering::SeqCst),
459 );
460 temp_now = Instant::now();
461 }
462 }
463 if am_counters.send_req_cnt.load(Ordering::SeqCst)
464 != am_counters.launched_req_cnt.load(Ordering::SeqCst)
465 {
466 if am_counters.outstanding_reqs.load(Ordering::SeqCst) > 0
467 || orig_reqs != am_counters.send_req_cnt.load(Ordering::SeqCst)
468 || orig_launched != am_counters.launched_req_cnt.load(Ordering::SeqCst)
469 {
470 continue;
471 }
472 println!(
473 "in darc await_all mype: {:?} cnt: {:?} {:?} {:?}",
474 self.team_rt().world_pe,
475 am_counters.send_req_cnt.load(Ordering::SeqCst),
476 am_counters.outstanding_reqs.load(Ordering::SeqCst),
477 am_counters.launched_req_cnt.load(Ordering::SeqCst)
478 );
479 RuntimeWarning::UnspawnedTask(
480 "`await_all` before all tasks/active messages have been spawned",
481 )
482 .print();
483 }
484 done = true;
485 }
486
487 self.inner.data.task_group.await_all().await;
488 // println!("done in wait all {:?}",std::time::SystemTime::now());
489 }
490
491 pub(crate) async fn await_on_outstanding(&self, mode: DarcMode) {
492 self.await_all().await;
493 // println!("block on outstanding");
494 // self.inner.data.print();
495 // let the_array: UnsafeArray<T> = self.clone();
496 let array_darc = self.inner.data.clone();
497 array_darc.block_on_outstanding(mode, 1).await;
498 }
499
500 #[doc(alias = "Collective")]
501 /// Convert this UnsafeArray into a (safe) [ReadOnlyArray]
502 ///
503 /// This is a collective and blocking function which will only return when there is at most a single reference on each PE
504 /// to this Array, and that reference is currently calling this function.
505 ///
506 /// When it returns, it is gauranteed that there are only `ReadOnlyArray` handles to the underlying data
507 ///
508 /// # Collective Operation
509 /// Requires all PEs associated with the `array` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally)
510 ///
511 /// # Examples
512 ///```
513 /// use lamellar::array::prelude::*;
514 /// let world = LamellarWorldBuilder::new().build();
515 /// let my_pe = world.my_pe();
516 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
517 ///
518 /// let read_only_array = array.into_read_only().block();
519 ///```
520 ///
521 /// # Warning
522 /// Because this call blocks there is the possibility for deadlock to occur, as highlighted below:
523 ///```no_run
524 /// use lamellar::array::prelude::*;
525 /// let world = LamellarWorldBuilder::new().build();
526 /// let my_pe = world.my_pe();
527 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
528 ///
529 /// let array1 = array.clone();
530 /// let mut_slice = unsafe {array1.local_as_mut_slice()};
531 ///
532 /// // no borrows to this specific instance (array) so it can enter the "into_read_only" call
533 /// // but array1 will not be dropped until after mut_slice is dropped.
534 /// // Given the ordering of these calls we will get stuck in "into_read_only" as it
535 /// // waits for the reference count to go down to "1" (but we will never be able to drop mut_slice/array1).
536 /// let ro_array = array.into_read_only().block();
537 /// ro_array.print();
538 /// println!("{mut_slice:?}");
539 ///```
540 /// Instead we would want to do something like:
541 ///```
542 /// use lamellar::array::prelude::*;
543 /// let world = LamellarWorldBuilder::new().build();
544 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
545 ///
546 /// let array1 = array.clone();
547 /// let slice = unsafe {array1.local_data()};
548 ///
549 /// // do something interesting with the slice and then manually drop the slice and array1 to ensure the reference count for array is 1.
550 /// println!("{:?}",slice[0]);
551 /// drop(slice);
552 /// drop(array1);
553 /// // now we can call into_read_only and it will not deadlock
554 /// let ro_array = array.into_read_only().block();
555 /// ro_array.print();
556 ///```
557 pub fn into_read_only(self) -> IntoReadOnlyArrayHandle<T> {
558 // println!("unsafe into read only");
559 IntoReadOnlyArrayHandle {
560 team: self.team_rt(),
561 launched: false,
562 outstanding_future: Box::pin(self.async_into()),
563 }
564 }
565
566 // pub fn into_local_only(self) -> LocalOnlyArray<T> {
567 // // println!("unsafe into local only");
568 // self.into()
569 // }
570
571 #[doc(alias = "Collective")]
572 /// Convert this UnsafeArray into a (safe) [LocalLockArray]
573 ///
574 /// This is a collective and blocking function which will only return when there is at most a single reference on each PE
575 /// to this Array, and that reference is currently calling this function.
576 ///
577 /// When it returns, it is gauranteed that there are only `LocalLockArray` handles to the underlying data
578 ///
579 /// # Collective Operation
580 /// Requires all PEs associated with the `array` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally)
581 ///
582 /// # Examples
583 ///```
584 /// use lamellar::array::prelude::*;
585 /// let world = LamellarWorldBuilder::new().build();
586 /// let my_pe = world.my_pe();
587 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
588 ///
589 /// let local_lock_array = array.into_local_lock().block();
590 ///```
591 /// # Warning
592 /// Because this call blocks there is the possibility for deadlock to occur, as highlighted below:
593 ///```no_run
594 /// use lamellar::array::prelude::*;
595 /// let world = LamellarWorldBuilder::new().build();
596 /// let my_pe = world.my_pe();
597 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
598 ///
599 /// let array1 = array.clone();
600 /// let mut_slice = unsafe {array1.local_as_mut_slice()};
601 ///
602 /// // no borrows to this specific instance (array) so it can enter the "into_local_lock" call
603 /// // but array1 will not be dropped until after mut_slice is dropped.
604 /// // Given the ordering of these calls we will get stuck in "iinto_local_lock" as it
605 /// // waits for the reference count to go down to "1" (but we will never be able to drop mut_slice/array1).
606 /// let local_lock_array = array.into_local_lock().block();
607 /// local_lock_array.print();
608 /// println!("{mut_slice:?}");
609 ///```
610 /// Instead we would want to do something like:
611 ///```
612 /// use lamellar::array::prelude::*;
613 /// let world = LamellarWorldBuilder::new().build();
614 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
615 ///
616 /// let array1 = array.clone();
617 /// let slice = unsafe {array1.local_data()};
618 ///
619 /// // do something interesting with the slice and then manually drop the slice and array1 to ensure the reference count for array is 1.
620 /// println!("{:?}",slice[0]);
621 /// drop(slice);
622 /// drop(array1);
623 /// // now we can call into_local_lock and it will not deadlock
624 /// let local_lock_array = array.into_local_lock().block();
625 /// local_lock_array.print();
626 ///```
627 pub fn into_local_lock(self) -> IntoLocalLockArrayHandle<T> {
628 // println!("unsafe into local lock atomic");
629 // self.into()
630
631 IntoLocalLockArrayHandle {
632 team: self.team_rt(),
633 launched: false,
634 outstanding_future: Box::pin(self.async_into()),
635 }
636 }
637
638 #[doc(alias = "Collective")]
639 /// Convert this UnsafeArray into a (safe) [GlobalLockArray]
640 ///
641 /// This is a collective and blocking function which will only return when there is at most a single reference on each PE
642 /// to this Array, and that reference is currently calling this function.
643 ///
644 /// When it returns, it is gauranteed that there are only `GlobalLockArray` handles to the underlying data
645 ///
646 /// # Collective Operation
647 /// Requires all PEs associated with the `array` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally)
648 ///
649 /// # Examples
650 ///```
651 /// use lamellar::array::prelude::*;
652 /// let world = LamellarWorldBuilder::new().build();
653 /// let my_pe = world.my_pe();
654 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
655 ///
656 /// let global_lock_array = array.into_global_lock().block();
657 ///```
658 /// # Warning
659 /// Because this call blocks there is the possibility for deadlock to occur, as highlighted below:
660 ///```no_run
661 /// use lamellar::array::prelude::*;
662 /// let world = LamellarWorldBuilder::new().build();
663 /// let my_pe = world.my_pe();
664 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
665 ///
666 /// let array1 = array.clone();
667 /// let slice = unsafe {array1.local_data()};
668 ///
669 /// // no borrows to this specific instance (array) so it can enter the "into_global_lock" call
670 /// // but array1 will not be dropped until after mut_slice is dropped.
671 /// // Given the ordering of these calls we will get stuck in "into_global_lock" as it
672 /// // waits for the reference count to go down to "1" (but we will never be able to drop slice/array1).
673 /// let global_lock_array = array.into_global_lock().block();
674 /// global_lock_array.print();
675 /// println!("{slice:?}");
676 ///```
677 /// Instead we would want to do something like:
678 ///```
679 /// use lamellar::array::prelude::*;
680 /// let world = LamellarWorldBuilder::new().build();
681 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
682 ///
683 /// let array1 = array.clone();
684 /// let slice = unsafe {array1.local_data()};
685 ///
686 /// // do something interesting with the slice and then manually drop the slice and array1 to ensure the reference count for array is 1.
687 /// println!("{:?}",slice[0]);
688 /// drop(slice);
689 /// drop(array1);
690 /// // now we can call into_global_lock and it will not deadlock
691 /// let global_lock_array = array.into_global_lock().block();
692 /// global_lock_array.print();
693 ///```
694 pub fn into_global_lock(self) -> IntoGlobalLockArrayHandle<T> {
695 // println!("readonly into_global_lock");
696 IntoGlobalLockArrayHandle {
697 team: self.team_rt(),
698 launched: false,
699 outstanding_future: Box::pin(self.async_into()),
700 }
701 }
702
703 pub(crate) fn async_barrier(&self) -> BarrierHandle {
704 self.inner.data.team.async_barrier()
705 }
706}
707
708impl<T: Dist + 'static> UnsafeArray<T> {
709 #[doc(alias = "Collective")]
710 /// Convert this UnsafeArray into a (safe) [AtomicArray]
711 ///
712 /// This is a collective and blocking function which will only return when there is at most a single reference on each PE
713 /// to this Array, and that reference is currently calling this function.
714 ///
715 /// When it returns, it is gauranteed that there are only `AtomicArray` handles to the underlying data
716 ///
717 /// # Collective Operation
718 /// Requires all PEs associated with the `array` to enter the call otherwise deadlock will occur (i.e. team barriers are being called internally)
719 ///
720 /// # Examples
721 ///```
722 /// use lamellar::array::prelude::*;
723 /// let world = LamellarWorldBuilder::new().build();
724 /// let my_pe = world.my_pe();
725 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
726 ///
727 /// let atomic_array = array.into_atomic().block();
728 ///```
729 /// # Warning
730 /// Because this call blocks there is the possibility for deadlock to occur, as highlighted below:
731 ///```no_run
732 /// use lamellar::array::prelude::*;
733 /// let world = LamellarWorldBuilder::new().build();
734 /// let my_pe = world.my_pe();
735 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
736 ///
737 /// let array1 = array.clone();
738 /// let mut_slice = unsafe {array1.local_as_mut_slice()};
739 ///
740 /// // no borrows to this specific instance (array) so it can enter the "into_atomic" call
741 /// // but array1 will not be dropped until after mut_slice is dropped.
742 /// // Given the ordering of these calls we will get stuck in "into_atomic" as it
743 /// // waits for the reference count to go down to "1" (but we will never be able to drop mut_slice/array1).
744 /// let atomic_array = array.into_atomic().block();
745 /// atomic_array.print();
746 /// println!("{mut_slice:?}");
747 ///```
748 /// Instead we would want to do something like:
749 ///```
750 /// use lamellar::array::prelude::*;
751 /// let world = LamellarWorldBuilder::new().build();
752 /// let array: UnsafeArray<usize> = UnsafeArray::new(&world,100,Distribution::Cyclic).block();
753 ///
754 /// let array1 = array.clone();
755 /// let slice = unsafe {array1.local_data()};
756 ///
757 /// // do something interesting with the slice and then manually drop the slice and array1 to ensure the reference count for array is 1.
758 /// println!("{:?}",slice[0]);
759 /// drop(slice);
760 /// drop(array1);
761 /// // now we can call into_atomic and it will not deadlock
762 /// let atomic_array = array.into_atomic().block();
763 /// atomic_array.print();
764 ///```
765 pub fn into_atomic(self) -> IntoAtomicArrayHandle<T> {
766 // println!("unsafe into atomic");
767 IntoAtomicArrayHandle {
768 team: self.team_rt(),
769 launched: false,
770 outstanding_future: Box::pin(self.async_into()),
771 }
772 }
773}
774
775// use crate::array::private::LamellarArrayPrivate;
776// impl <T: Dist, A: LamellarArrayPrivate<T>> From<A> for UnsafeArray<T>{
777// fn from(array: A) -> Self {
778// let array = array.into_inner();
779// array.block_on_outstanding(DarcMode::UnsafeArray);
780// array.create_buffered_ops();
781// array
782// }
783// }
784
785impl<T: Dist + ArrayOps> TeamFrom<(Vec<T>, Distribution)> for UnsafeArray<T> {
786 fn team_from(input: (Vec<T>, Distribution), team: &Arc<LamellarTeam>) -> Self {
787 let (vals, distribution) = input;
788 let input = (&vals, distribution);
789 TeamInto::team_into(input, team)
790 }
791}
792
793// #[async_trait]
794impl<T: Dist + ArrayOps> AsyncTeamFrom<(Vec<T>, Distribution)> for UnsafeArray<T> {
795 async fn team_from(input: (Vec<T>, Distribution), team: &Arc<LamellarTeam>) -> Self {
796 let (local_vals, distribution) = input;
797 let team = team.team.clone();
798 // println!("local_vals len: {:?}", local_vals.len());
799 team.async_barrier().await;
800 let local_sizes = UnsafeArray::<usize>::async_new(
801 team.clone(),
802 team.num_pes,
803 Distribution::Block,
804 crate::darc::DarcMode::UnsafeArray,
805 )
806 .await;
807 unsafe {
808 local_sizes.local_as_mut_slice()[0] = local_vals.len();
809 }
810 team.async_barrier().await;
811 // local_sizes.barrier();
812 let mut size = 0;
813 let mut my_start = 0;
814 let my_pe = team.team_pe.expect("pe not part of team");
815 unsafe {
816 local_sizes
817 .buffered_onesided_iter(team.num_pes)
818 .into_stream()
819 .enumerate()
820 .for_each(|(i, local_size)| {
821 size += local_size;
822 if i < my_pe {
823 my_start += local_size;
824 }
825 future::ready(())
826 })
827 .await;
828 }
829 let array = UnsafeArray::<T>::async_new(
830 team.clone(),
831 size,
832 distribution,
833 crate::darc::DarcMode::UnsafeArray,
834 )
835 .await;
836 if local_vals.len() > 0 {
837 unsafe { array.put(my_start, local_vals).await };
838 }
839 team.async_barrier().await;
840 array
841 }
842}
843
844impl<T: Dist + ArrayOps> TeamFrom<(&Vec<T>, Distribution)> for UnsafeArray<T> {
845 fn team_from(input: (&Vec<T>, Distribution), team: &Arc<LamellarTeam>) -> Self {
846 let (local_vals, distribution) = input;
847 let team = team.team.clone();
848 // println!("local_vals len: {:?}", local_vals.len());
849 // team.tasking_barrier();
850 let local_sizes =
851 UnsafeArray::<usize>::new(team.clone(), team.num_pes, Distribution::Block).block();
852 unsafe {
853 local_sizes.local_as_mut_slice()[0] = local_vals.len();
854 }
855 local_sizes.barrier();
856 let mut size = 0;
857 let mut my_start = 0;
858 let my_pe = team.team_pe.expect("pe not part of team");
859 unsafe {
860 local_sizes
861 .buffered_onesided_iter(team.num_pes)
862 .into_iter()
863 .enumerate()
864 .for_each(|(i, local_size)| {
865 size += local_size;
866 if i < my_pe {
867 my_start += local_size;
868 }
869 });
870 }
871 let array = UnsafeArray::<T>::new(team.clone(), size, distribution).block();
872 if local_vals.len() > 0 {
873 array.block_on(unsafe { array.put(my_start, local_vals) });
874 }
875 array.barrier();
876 array
877 }
878}
879
880// impl<T: Dist> From<AtomicArray<T>> for UnsafeArray<T> {
881// fn from(array: AtomicArray<T>) -> Self {
882// match array {
883// AtomicArray::NativeAtomicArray(array) => UnsafeArray::<T>::from(array),
884// AtomicArray::GenericAtomicArray(array) => UnsafeArray::<T>::from(array),
885// }
886// }
887// }
888
889#[async_trait]
890impl<T: Dist> AsyncFrom<AtomicArray<T>> for UnsafeArray<T> {
891 async fn async_from(array: AtomicArray<T>) -> Self {
892 match array {
893 AtomicArray::NativeAtomicArray(array) => UnsafeArray::<T>::async_from(array).await,
894 AtomicArray::GenericAtomicArray(array) => UnsafeArray::<T>::async_from(array).await,
895 }
896 }
897}
898
899#[async_trait]
900impl<T: Dist> AsyncFrom<NativeAtomicArray<T>> for UnsafeArray<T> {
901 async fn async_from(array: NativeAtomicArray<T>) -> Self {
902 array
903 .array
904 .await_on_outstanding(DarcMode::UnsafeArray)
905 .await;
906 array.array
907 }
908}
909
910#[async_trait]
911impl<T: Dist> AsyncFrom<GenericAtomicArray<T>> for UnsafeArray<T> {
912 async fn async_from(array: GenericAtomicArray<T>) -> Self {
913 array
914 .array
915 .await_on_outstanding(DarcMode::UnsafeArray)
916 .await;
917 array.array
918 }
919}
920
921#[async_trait]
922impl<T: Dist> AsyncFrom<LocalLockArray<T>> for UnsafeArray<T> {
923 async fn async_from(array: LocalLockArray<T>) -> Self {
924 array
925 .array
926 .await_on_outstanding(DarcMode::UnsafeArray)
927 .await;
928 array.array
929 }
930}
931
932#[async_trait]
933impl<T: Dist> AsyncFrom<GlobalLockArray<T>> for UnsafeArray<T> {
934 async fn async_from(array: GlobalLockArray<T>) -> Self {
935 array
936 .array
937 .await_on_outstanding(DarcMode::UnsafeArray)
938 .await;
939 array.array
940 }
941}
942
943#[async_trait]
944impl<T: Dist> AsyncFrom<ReadOnlyArray<T>> for UnsafeArray<T> {
945 async fn async_from(array: ReadOnlyArray<T>) -> Self {
946 array
947 .array
948 .await_on_outstanding(DarcMode::UnsafeArray)
949 .await;
950 array.array
951 }
952}
953
954impl<T: Dist> From<UnsafeByteArray> for UnsafeArray<T> {
955 fn from(array: UnsafeByteArray) -> Self {
956 UnsafeArray {
957 inner: array.inner,
958 phantom: PhantomData,
959 }
960 }
961}
962
963impl<T: Dist> From<&UnsafeByteArray> for UnsafeArray<T> {
964 fn from(array: &UnsafeByteArray) -> Self {
965 UnsafeArray {
966 inner: array.inner.clone(),
967 phantom: PhantomData,
968 }
969 }
970}
971
972impl<T: Dist> From<UnsafeArray<T>> for UnsafeByteArray {
973 fn from(array: UnsafeArray<T>) -> Self {
974 UnsafeByteArray { inner: array.inner }
975 }
976}
977
978impl<T: Dist> From<&UnsafeArray<T>> for UnsafeByteArray {
979 fn from(array: &UnsafeArray<T>) -> Self {
980 UnsafeByteArray {
981 inner: array.inner.clone(),
982 }
983 }
984}
985
986impl<T: Dist> From<UnsafeArray<T>> for LamellarByteArray {
987 fn from(array: UnsafeArray<T>) -> Self {
988 LamellarByteArray::UnsafeArray(array.into())
989 }
990}
991
992impl<T: Dist> From<LamellarByteArray> for UnsafeArray<T> {
993 fn from(array: LamellarByteArray) -> Self {
994 if let LamellarByteArray::UnsafeArray(array) = array {
995 array.into()
996 } else {
997 panic!("Expected LamellarByteArray::UnsafeArray")
998 }
999 }
1000}
1001
1002impl<T: Dist> ArrayExecAm<T> for UnsafeArray<T> {
1003 fn team_rt(&self) -> Pin<Arc<LamellarTeamRT>> {
1004 self.team_rt()
1005 }
1006 fn team_counters(&self) -> Arc<AMCounters> {
1007 self.inner.data.array_counters.clone()
1008 }
1009}
1010impl<T: Dist> LamellarArrayPrivate<T> for UnsafeArray<T> {
1011 fn inner_array(&self) -> &UnsafeArray<T> {
1012 self
1013 }
1014 fn local_as_ptr(&self) -> *const T {
1015 self.local_as_mut_ptr()
1016 }
1017 fn local_as_mut_ptr(&self) -> *mut T {
1018 self.local_as_mut_ptr()
1019 }
1020 fn pe_for_dist_index(&self, index: usize) -> Option<usize> {
1021 self.inner.pe_for_dist_index(index)
1022 }
1023 fn pe_offset_for_dist_index(&self, pe: usize, index: usize) -> Option<usize> {
1024 if self.inner.sub {
1025 self.inner.pe_sub_offset_for_dist_index(pe, index)
1026 } else {
1027 self.inner.pe_full_offset_for_dist_index(pe, index)
1028 }
1029 }
1030
1031 unsafe fn into_inner(self) -> UnsafeArray<T> {
1032 self
1033 }
1034 fn as_lamellar_byte_array(&self) -> LamellarByteArray {
1035 self.clone().into()
1036 }
1037}
1038
1039impl<T: Dist> ActiveMessaging for UnsafeArray<T> {
1040 type SinglePeAmHandle<R: AmDist> = AmHandle<R>;
1041 type MultiAmHandle<R: AmDist> = MultiAmHandle<R>;
1042 type LocalAmHandle<L> = LocalAmHandle<L>;
1043 fn exec_am_all<F>(&self, am: F) -> Self::MultiAmHandle<F::Output>
1044 where
1045 F: RemoteActiveMessage + LamellarAM + Serde + AmDist,
1046 {
1047 self.inner
1048 .data
1049 .team
1050 .exec_am_all_tg(am, Some(self.team_counters()))
1051 }
1052 fn exec_am_pe<F>(&self, pe: usize, am: F) -> Self::SinglePeAmHandle<F::Output>
1053 where
1054 F: RemoteActiveMessage + LamellarAM + Serde + AmDist,
1055 {
1056 self.inner
1057 .data
1058 .team
1059 .exec_am_pe_tg(pe, am, Some(self.team_counters()))
1060 }
1061 fn exec_am_local<F>(&self, am: F) -> Self::LocalAmHandle<F::Output>
1062 where
1063 F: LamellarActiveMessage + LocalAM + 'static,
1064 {
1065 self.inner
1066 .data
1067 .team
1068 .exec_am_local_tg(am, Some(self.team_counters()))
1069 }
1070 fn wait_all(&self) {
1071 let mut temp_now = Instant::now();
1072 // println!(
1073 // "in array wait_all cnt: {:?} {:?} {:?}",
1074 // self.inner
1075 // .data
1076 // .array_counters
1077 // .send_req_cnt
1078 // .load(Ordering::SeqCst),
1079 // self.inner
1080 // .data
1081 // .array_counters
1082 // .outstanding_reqs
1083 // .load(Ordering::SeqCst),
1084 // self.inner
1085 // .data
1086 // .array_counters
1087 // .launched_req_cnt
1088 // .load(Ordering::SeqCst)
1089 // );
1090 // let mut first = true;
1091 while self
1092 .inner
1093 .data
1094 .array_counters
1095 .outstanding_reqs
1096 .load(Ordering::SeqCst)
1097 > 0
1098 || self.inner.data.req_cnt.load(Ordering::SeqCst) > 0
1099 {
1100 // std::thread::yield_now();
1101 // self.inner.data.team.flush();
1102 self.inner.data.team.scheduler.exec_task(); //mmight as well do useful work while we wait
1103 if temp_now.elapsed().as_secs_f64() > config().deadlock_timeout {
1104 //|| first{
1105 println!(
1106 "in array wait_all mype: {:?} cnt: {:?} {:?} {:?}",
1107 self.inner.data.team.world_pe,
1108 self.inner
1109 .data
1110 .array_counters
1111 .send_req_cnt
1112 .load(Ordering::SeqCst),
1113 self.inner
1114 .data
1115 .array_counters
1116 .outstanding_reqs
1117 .load(Ordering::SeqCst),
1118 self.inner.data.req_cnt.load(Ordering::SeqCst)
1119 );
1120 temp_now = Instant::now();
1121 // first = false;
1122 }
1123 }
1124 if self
1125 .inner
1126 .data
1127 .array_counters
1128 .send_req_cnt
1129 .load(Ordering::SeqCst)
1130 != self
1131 .inner
1132 .data
1133 .array_counters
1134 .launched_req_cnt
1135 .load(Ordering::SeqCst)
1136 {
1137 println!(
1138 "in array wait_all cnt: {:?} {:?} {:?}",
1139 self.inner
1140 .data
1141 .array_counters
1142 .send_req_cnt
1143 .load(Ordering::SeqCst),
1144 self.inner
1145 .data
1146 .array_counters
1147 .outstanding_reqs
1148 .load(Ordering::SeqCst),
1149 self.inner
1150 .data
1151 .array_counters
1152 .launched_req_cnt
1153 .load(Ordering::SeqCst)
1154 );
1155 RuntimeWarning::UnspawnedTask(
1156 "`wait_all` on an array before all operations, iterators, etc, created by the array have been spawned",
1157 )
1158 .print();
1159 }
1160 self.inner.data.task_group.wait_all();
1161 }
1162 fn await_all(&self) -> impl Future<Output = ()> + Send {
1163 self.await_all()
1164 }
1165 fn barrier(&self) {
1166 self.inner.data.team.barrier()
1167 }
1168 fn async_barrier(&self) -> BarrierHandle {
1169 self.inner.data.team.async_barrier()
1170 }
1171 fn spawn<F: Future>(&self, f: F) -> LamellarTask<F::Output>
1172 where
1173 F: Future + Send + 'static,
1174 F::Output: Send,
1175 {
1176 self.inner.data.team.scheduler.spawn_task(
1177 f,
1178 vec![
1179 self.inner.data.team.world_counters.clone(),
1180 self.inner.data.team.team_counters.clone(),
1181 self.inner.data.array_counters.clone(),
1182 ],
1183 )
1184 }
1185 fn block_on<F: Future>(&self, f: F) -> F::Output {
1186 self.inner.data.team.scheduler.block_on(f)
1187 }
1188 fn block_on_all<I>(&self, iter: I) -> Vec<<<I as IntoIterator>::Item as Future>::Output>
1189 where
1190 I: IntoIterator,
1191 <I as IntoIterator>::Item: Future + Send + 'static,
1192 <<I as IntoIterator>::Item as Future>::Output: Send,
1193 {
1194 self.inner.data.team.block_on_all(iter)
1195 }
1196}
1197
1198impl<T: Dist> LamellarArray<T> for UnsafeArray<T> {
1199 fn len(&self) -> usize {
1200 self.inner.size
1201 }
1202
1203 fn num_elems_local(&self) -> usize {
1204 self.inner.num_elems_local()
1205 }
1206 //#[tracing::instrument(skip_all)]
1207 fn pe_and_offset_for_global_index(&self, index: usize) -> Option<(usize, usize)> {
1208 if self.inner.sub {
1209 // println!("sub array {index}");
1210 let pe = self.inner.pe_for_dist_index(index)?;
1211 // println!("pe: {pe}");
1212 let offset = self.inner.pe_sub_offset_for_dist_index(pe, index)?;
1213 // println!(
1214 // "sub array index {index} pe {pe} offset {offset} size {} {} {}",
1215 // self.inner.size,
1216 // self.inner.num_elems_pe(0),
1217 // self.inner.num_elems_pe(1)
1218 // );
1219 Some((pe, offset))
1220 } else {
1221 self.inner.full_pe_and_offset_for_global_index(index)
1222 }
1223 }
1224
1225 fn first_global_index_for_pe(&self, pe: usize) -> Option<usize> {
1226 self.inner.start_index_for_pe(pe)
1227 }
1228
1229 fn last_global_index_for_pe(&self, pe: usize) -> Option<usize> {
1230 self.inner.end_index_for_pe(pe)
1231 }
1232}
1233
1234impl<T: Dist> LamellarEnv for UnsafeArray<T> {
1235 fn my_pe(&self) -> usize {
1236 self.inner.data.my_pe
1237 }
1238
1239 fn num_pes(&self) -> usize {
1240 self.inner.data.num_pes
1241 }
1242
1243 fn num_threads_per_pe(&self) -> usize {
1244 self.inner.data.team.num_threads()
1245 }
1246 fn world(&self) -> Arc<LamellarTeam> {
1247 self.inner.data.team.world()
1248 }
1249 fn team(&self) -> Arc<LamellarTeam> {
1250 self.inner.data.team.team()
1251 }
1252}
1253
1254impl<T: Dist> LamellarWrite for UnsafeArray<T> {}
1255impl<T: Dist> LamellarRead for UnsafeArray<T> {}
1256
1257impl<T: Dist> SubArray<T> for UnsafeArray<T> {
1258 type Array = UnsafeArray<T>;
1259 fn sub_array<R: std::ops::RangeBounds<usize>>(&self, range: R) -> Self::Array {
1260 let start = match range.start_bound() {
1261 //inclusive
1262 Bound::Included(idx) => *idx,
1263 Bound::Excluded(idx) => *idx + 1,
1264 Bound::Unbounded => 0,
1265 };
1266 let end = match range.end_bound() {
1267 //exclusive
1268 Bound::Included(idx) => *idx + 1,
1269 Bound::Excluded(idx) => *idx,
1270 Bound::Unbounded => self.inner.size,
1271 };
1272 if end > self.inner.size {
1273 panic!(
1274 "subregion range ({:?}-{:?}) exceeds size of array {:?}",
1275 start, end, self.inner.size
1276 );
1277 }
1278 // println!(
1279 // "new inner start {:?} end {:?} size {:?} cur offset {:?} cur size {:?}",
1280 // start,
1281 // end,
1282 // end - start,
1283 // self.inner.offset,
1284 // self.inner.size
1285 // );
1286 let mut inner = self.inner.clone();
1287 inner.offset += start;
1288 inner.size = end - start;
1289 inner.sub = true;
1290 UnsafeArray {
1291 inner: inner,
1292 phantom: PhantomData,
1293 }
1294 }
1295 fn global_index(&self, sub_index: usize) -> usize {
1296 self.inner.offset + sub_index
1297 }
1298}
1299
1300impl<T: Dist + std::fmt::Debug> UnsafeArray<T> {
1301 #[doc(alias = "Collective")]
1302 /// Print the data within a lamellar array
1303 ///
1304 /// # Collective Operation
1305 /// Requires all PEs associated with the array to enter the print call otherwise deadlock will occur (i.e. barriers are being called internally)
1306 ///
1307 /// # Examples
1308 ///```
1309 /// use lamellar::array::prelude::*;
1310 /// let world = LamellarWorldBuilder::new().build();
1311 /// let block_array = UnsafeArray::<usize>::new(&world,100,Distribution::Block).block();
1312 /// let cyclic_array = UnsafeArray::<usize>::new(&world,100,Distribution::Block).block();
1313 ///
1314 /// unsafe{
1315 /// let _ =block_array.dist_iter_mut().enumerate().for_each(move |(i,elem)| {
1316 /// *elem = i;
1317 /// }).spawn();
1318 /// let _ = cyclic_array.dist_iter_mut().enumerate().for_each(move |(i,elem)| {
1319 /// *elem = i;
1320 /// }).spawn();
1321 /// }
1322 /// world.wait_all();
1323 /// block_array.print();
1324 /// println!();
1325 /// cyclic_array.print();
1326 ///```
1327 pub fn print(&self) {
1328 <UnsafeArray<T> as ArrayPrint<T>>::print(&self);
1329 }
1330}
1331
1332impl<T: Dist + std::fmt::Debug> ArrayPrint<T> for UnsafeArray<T> {
1333 fn print(&self) {
1334 self.inner.data.team.tasking_barrier(); //TODO: have barrier accept a string so we can print where we are stalling.
1335 for pe in 0..self.inner.data.team.num_pes() {
1336 self.inner.data.team.tasking_barrier();
1337 if self.inner.data.my_pe == pe {
1338 println!("[pe {:?} data] {:?}", pe, unsafe { self.local_as_slice() });
1339 }
1340 std::thread::sleep(std::time::Duration::from_millis(100));
1341 }
1342 }
1343}
1344
1345impl<T: Dist + AmDist + 'static> UnsafeArray<T> {
1346 pub(crate) fn get_reduction_op(
1347 &self,
1348 op: &str,
1349 byte_array: LamellarByteArray,
1350 ) -> LamellarArcAm {
1351 REDUCE_OPS
1352 .get(&(std::any::TypeId::of::<T>(), op))
1353 .expect("unexpected reduction type")(byte_array, self.inner.data.team.num_pes())
1354 }
1355 pub(crate) fn reduce_data(
1356 &self,
1357 op: &str,
1358 byte_array: LamellarByteArray,
1359 ) -> AmHandle<Option<T>> {
1360 let func = self.get_reduction_op(op, byte_array);
1361 if let Ok(my_pe) = self.inner.data.team.team_pe_id() {
1362 self.inner.data.team.exec_arc_am_pe::<Option<T>>(
1363 my_pe,
1364 func,
1365 Some(self.inner.data.array_counters.clone()),
1366 )
1367 } else {
1368 self.inner.data.team.exec_arc_am_pe::<Option<T>>(
1369 0,
1370 func,
1371 Some(self.inner.data.array_counters.clone()),
1372 )
1373 }
1374 }
1375}
1376
1377// This is esentially impl LamellarArrayReduce, but we man to explicity have UnsafeArray expose unsafe functions
1378impl<T: Dist + AmDist + 'static> UnsafeArray<T> {
1379 #[doc(alias("One-sided", "onesided"))]
1380 /// Perform a reduction on the entire distributed array, returning the value to the calling PE.
1381 ///
1382 /// Please see the documentation for the [register_reduction] procedural macro for
1383 /// more details and examples on how to create your own reductions.
1384 ///
1385 /// # Safety
1386 /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's or local threads may access this PE's local data.
1387 /// Any updates to local data are not guaranteed to be Atomic.
1388 ///
1389 /// # One-sided Operation
1390 /// The calling PE is responsible for launching `Reduce` active messages on the other PEs associated with the array.
1391 /// the returned reduction result is only available on the calling PE
1392 /// # Note
1393 /// The future retuned by this function is lazy and does nothing unless awaited, [spawned][AmHandle::spawn] or [blocked on][AmHandle::block]
1394 /// # Examples
1395 /// ```
1396 /// use lamellar::array::prelude::*;
1397 /// use rand::Rng;
1398 /// let world = LamellarWorldBuilder::new().build();
1399 /// let num_pes = world.num_pes();
1400 /// let array = UnsafeArray::<usize>::new(&world,1000000,Distribution::Block).block();
1401 /// let array_clone = array.clone();
1402 /// unsafe { // THIS IS NOT SAFE -- we are randomly updating elements, no protections, updates may be lost... DONT DO THIS
1403 /// let req = array.local_iter().for_each(move |_| {
1404 /// let index = rand::thread_rng().gen_range(0..array_clone.len());
1405 /// let _ = array_clone.add(index,1).spawn(); //randomly at one to an element in the array.
1406 /// }).spawn();
1407 /// }
1408 /// array.wait_all();
1409 /// array.barrier();
1410 /// let sum = unsafe {array.reduce("sum").block()}; // equivalent to calling array.sum()
1411 /// //assert_eq!(array.len()*num_pes,sum); // may or may not fail
1412 ///```
1413 #[must_use = "this function is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "]
1414 pub unsafe fn reduce(&self, op: &str) -> AmHandle<Option<T>> {
1415 self.reduce_data(op, self.clone().into())
1416 }
1417
1418 #[doc(alias("One-sided", "onesided"))]
1419 /// Perform a sum reduction on the entire distributed array, returning the value to the calling PE.
1420 ///
1421 /// This equivalent to `reduce("sum")`.
1422 ///
1423 /// # Safety
1424 /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's or local threads may access this PE's local data.
1425 /// Any updates to local data are not guaranteed to be Atomic.
1426 ///
1427 /// # One-sided Operation
1428 /// The calling PE is responsible for launching `Sum` active messages on the other PEs associated with the array.
1429 /// the returned sum reduction result is only available on the calling PE
1430 /// # Note
1431 /// The future retuned by this function is lazy and does nothing unless awaited, [spawned][AmHandle::spawn] or [blocked on][AmHandle::block]
1432 /// # Examples
1433 /// ```
1434 /// use lamellar::array::prelude::*;
1435 /// use rand::Rng;
1436 /// let world = LamellarWorldBuilder::new().build();
1437 /// let num_pes = world.num_pes();
1438 /// let array = UnsafeArray::<usize>::new(&world,1000000,Distribution::Block).block();
1439 /// let array_clone = array.clone();
1440 /// unsafe { // THIS IS NOT SAFE -- we are randomly updating elements, no protections, updates may be lost... DONT DO THIS
1441 /// let req = array.local_iter().for_each(move |_| {
1442 /// let index = rand::thread_rng().gen_range(0..array_clone.len());
1443 /// let _ = array_clone.add(index,1).spawn(); //randomly at one to an element in the array.
1444 /// }).spawn();
1445 /// }
1446 /// array.wait_all();
1447 /// array.barrier();
1448 /// let sum = unsafe{array.sum().block()}; //Safe in this instance as we have ensured no updates are currently happening
1449 /// // assert_eq!(array.len()*num_pes,sum);//this may or may not fail
1450 ///```
1451 #[must_use = "this function is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "]
1452 pub unsafe fn sum(&self) -> AmHandle<Option<T>> {
1453 self.reduce("sum")
1454 }
1455
1456 #[doc(alias("One-sided", "onesided"))]
1457 /// Perform a production reduction on the entire distributed array, returning the value to the calling PE.
1458 ///
1459 /// This equivalent to `reduce("prod")`.
1460 ///
1461 /// # Safety
1462 /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's or local threads may access this PE's local data.
1463 /// Any updates to local data are not guaranteed to be Atomic.
1464 ///
1465 /// # One-sided Operation
1466 /// The calling PE is responsible for launching `Prod` active messages on the other PEs associated with the array.
1467 /// the returned prod reduction result is only available on the calling PE
1468 /// # Note
1469 /// The future retuned by this function is lazy and does nothing unless awaited, [spawned][AmHandle::spawn] or [blocked on][AmHandle::block]
1470 /// # Examples
1471 /// ```
1472 /// use lamellar::array::prelude::*;
1473 /// use rand::Rng;
1474 /// let world = LamellarWorldBuilder::new().build();
1475 /// let num_pes = world.num_pes();
1476 /// let array = UnsafeArray::<usize>::new(&world,10,Distribution::Block).block();
1477 /// unsafe {
1478 /// let req = array.dist_iter_mut().enumerate().for_each(move |(i,elem)| {
1479 /// *elem = i+1;
1480 /// }).spawn();
1481 /// }
1482 /// array.print();
1483 /// array.wait_all();
1484 /// array.print();
1485 /// let prod = unsafe{ array.prod().block().expect("array len > 0")};
1486 /// assert_eq!((1..=array.len()).product::<usize>(),prod);
1487 ///```
1488 #[must_use = "this function is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "]
1489 pub unsafe fn prod(&self) -> AmHandle<Option<T>> {
1490 self.reduce("prod")
1491 }
1492
1493 #[doc(alias("One-sided", "onesided"))]
1494 /// Find the max element in the entire destributed array, returning to the calling PE
1495 ///
1496 /// This equivalent to `reduce("max")`.
1497 ///
1498 /// # Safety
1499 /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's or local threads may access this PE's local data.
1500 /// Any updates to local data are not guaranteed to be Atomic.
1501 ///
1502 /// # One-sided Operation
1503 /// The calling PE is responsible for launching `Max` active messages on the other PEs associated with the array.
1504 /// the returned max reduction result is only available on the calling PE
1505 /// # Note
1506 /// The future retuned by this function is lazy and does nothing unless awaited, [spawned][AmHandle::spawn] or [blocked on][AmHandle::block]
1507 /// # Examples
1508 /// ```
1509 /// use lamellar::array::prelude::*;
1510 /// let world = LamellarWorldBuilder::new().build();
1511 /// let num_pes = world.num_pes();
1512 /// let array = UnsafeArray::<usize>::new(&world,10,Distribution::Block).block();
1513 /// let array_clone = array.clone();
1514 /// let _ = unsafe{array.dist_iter_mut().enumerate().for_each(|(i,elem)| *elem = i*2).spawn()}; //safe as we are accessing in a data parallel fashion
1515 /// array.wait_all();
1516 /// array.barrier();
1517 /// let max_req = unsafe{array.max()}; //Safe in this instance as we have ensured no updates are currently happening
1518 /// let max = max_req.block().expect("array len > 0");
1519 /// assert_eq!((array.len()-1)*2,max);
1520 ///```
1521 #[must_use = "this function is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "]
1522 pub unsafe fn max(&self) -> AmHandle<Option<T>> {
1523 self.reduce("max")
1524 }
1525
1526 #[doc(alias("One-sided", "onesided"))]
1527 /// Find the min element in the entire destributed array, returning to the calling PE
1528 ///
1529 /// This equivalent to `reduce("min")`.
1530 ///
1531 /// # Safety
1532 /// Data in UnsafeArrays are always unsafe as there are no protections on how remote PE's or local threads may access this PE's local data.
1533 /// Any updates to local data are not guaranteed to be Atomic.
1534 ///
1535 /// # One-sided Operation
1536 /// The calling PE is responsible for launching `Min` active messages on the other PEs associated with the array.
1537 /// the returned min reduction result is only available on the calling PE
1538 /// # Note
1539 /// The future retuned by this function is lazy and does nothing unless awaited, [spawned][AmHandle::spawn] or [blocked on][AmHandle::block]
1540 /// # Examples
1541 /// ```
1542 /// use lamellar::array::prelude::*;
1543 /// let world = LamellarWorldBuilder::new().build();
1544 /// let num_pes = world.num_pes();
1545 /// let array = UnsafeArray::<usize>::new(&world,10,Distribution::Block).block();
1546 /// let array_clone = array.clone();
1547 /// let _ = unsafe{array.dist_iter_mut().enumerate().for_each(|(i,elem)| *elem = i*2).spawn()}; //safe as we are accessing in a data parallel fashion
1548 /// array.wait_all();
1549 /// array.barrier();
1550 /// let min_req = unsafe{array.min()}; //Safe in this instance as we have ensured no updates are currently happening
1551 /// let min = min_req.block().expect("array len > 0");
1552 /// assert_eq!(0,min);
1553 ///```
1554 #[must_use = "this function is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it "]
1555 pub unsafe fn min(&self) -> AmHandle<Option<T>> {
1556 self.reduce("min")
1557 }
1558}
1559
1560impl UnsafeArrayInnerWeak {
1561 pub(crate) fn upgrade(&self) -> Option<UnsafeArrayInner> {
1562 if let Some(data) = self.data.upgrade() {
1563 Some(UnsafeArrayInner {
1564 data: data,
1565 distribution: self.distribution.clone(),
1566 orig_elem_per_pe: self.orig_elem_per_pe,
1567 orig_remaining_elems: self.orig_remaining_elems,
1568 elem_size: self.elem_size,
1569 offset: self.offset,
1570 size: self.size,
1571 sub: self.sub,
1572 })
1573 } else {
1574 None
1575 }
1576 }
1577}
1578
1579impl UnsafeArrayInner {
1580 pub(crate) fn spawn<F: Future>(&self, f: F) -> LamellarTask<F::Output>
1581 where
1582 F: Future + Send + 'static,
1583 F::Output: Send,
1584 {
1585 self.data.team.scheduler.spawn_task(
1586 f,
1587 vec![
1588 self.data.team.world_counters.clone(),
1589 self.data.team.team_counters.clone(),
1590 self.data.array_counters.clone(),
1591 ],
1592 )
1593 }
1594 pub(crate) fn block_on<F: Future>(&self, f: F) -> F::Output {
1595 self.data.team.scheduler.block_on(f)
1596 }
1597 pub(crate) fn downgrade(array: &UnsafeArrayInner) -> UnsafeArrayInnerWeak {
1598 UnsafeArrayInnerWeak {
1599 data: Darc::downgrade(&array.data),
1600 distribution: array.distribution.clone(),
1601 orig_elem_per_pe: array.orig_elem_per_pe,
1602 orig_remaining_elems: array.orig_remaining_elems,
1603 elem_size: array.elem_size,
1604 offset: array.offset,
1605 size: array.size,
1606 sub: array.sub,
1607 }
1608 }
1609
1610 pub(crate) fn full_pe_and_offset_for_global_index(
1611 &self,
1612 index: usize,
1613 ) -> Option<(usize, usize)> {
1614 if self.size > index {
1615 let global_index = index;
1616 match self.distribution {
1617 Distribution::Block => {
1618 let rem_index = self.orig_remaining_elems * (self.orig_elem_per_pe + 1);
1619
1620 let (pe, offset) = if global_index < rem_index {
1621 //index is on a pe with extra elems
1622 let pe = global_index / (self.orig_elem_per_pe + 1); // accounts for the reamining elems
1623 let offset = global_index - (pe * (self.orig_elem_per_pe + 1));
1624 (pe, offset)
1625 } else {
1626 //index is on a pe without extra elems
1627 let temp_index = global_index - rem_index; //get the remainin index after accounter for PEs with extra elements
1628 let temp_pe = temp_index / self.orig_elem_per_pe; //the pe after accounting for PEs with extra elements
1629 let pe = self.orig_remaining_elems // N pes that have extra elements
1630 + temp_pe;
1631 let offset = temp_index - (temp_pe * self.orig_elem_per_pe);
1632 (pe, offset)
1633 };
1634 Some((pe, offset))
1635 }
1636 Distribution::Cyclic => {
1637 let res = Some((
1638 global_index % self.data.num_pes,
1639 global_index / self.data.num_pes,
1640 ));
1641 res
1642 }
1643 }
1644 } else {
1645 None
1646 }
1647 }
1648
1649 //index is relative to (sub)array (i.e. index=0 doesnt necessarily live on pe=0)
1650 // //#[tracing::instrument(skip_all)]
1651 pub(crate) fn pe_for_dist_index(&self, index: usize) -> Option<usize> {
1652 // println!("pe_for_dist_index {index} {}", self.size);
1653 if self.size > index {
1654 let global_index = index + self.offset;
1655
1656 match self.distribution {
1657 Distribution::Block => {
1658 let rem_index = self.orig_remaining_elems * (self.orig_elem_per_pe + 1);
1659 let pe = if global_index < rem_index {
1660 global_index / (self.orig_elem_per_pe + 1) // accounts for the reamining elems
1661 } else {
1662 self.orig_remaining_elems // N pes that have extra elements
1663 + ((global_index - rem_index) //get the remainin index after accounter for PEs with extra elements
1664 / self.orig_elem_per_pe)
1665 };
1666 Some(pe)
1667 }
1668 Distribution::Cyclic => Some(global_index % self.data.num_pes),
1669 }
1670 } else {
1671 None
1672 }
1673 }
1674
1675 //index relative to subarray, return offset relative to subarray
1676 // //#[tracing::instrument(skip_all)]
1677 pub(crate) fn pe_full_offset_for_dist_index(&self, pe: usize, index: usize) -> Option<usize> {
1678 // println!("pe_full_offset_for_dist_index pe {pe} index {index}");
1679 let global_index = self.offset + index;
1680 match self.distribution {
1681 Distribution::Block => {
1682 let rem_index = self.orig_remaining_elems * (self.orig_elem_per_pe + 1);
1683 // println!("\tindex: {index} offset {} size {} global_index {global_index} rem_index {rem_index}",self.offset, self.size);
1684 let offset = if global_index < rem_index {
1685 //index is on a pe with extra elems
1686 global_index - (pe * (self.orig_elem_per_pe + 1))
1687 } else {
1688 //index is on a pe without extra elems
1689 let temp_index = global_index - rem_index; //get the remainin index after accounter for PEs with extra elements
1690 let temp_pe = temp_index / self.orig_elem_per_pe; //the pe after accounting for PEs with extra elements
1691
1692 temp_index - (temp_pe * self.orig_elem_per_pe)
1693 };
1694 Some(offset)
1695 }
1696 Distribution::Cyclic => {
1697 let num_pes = self.data.num_pes;
1698 if global_index % num_pes == pe {
1699 Some(index / num_pes)
1700 } else {
1701 None
1702 }
1703 }
1704 }
1705 }
1706
1707 //index relative to subarray, return offset relative to subarray
1708 pub(crate) fn pe_sub_offset_for_dist_index(&self, pe: usize, index: usize) -> Option<usize> {
1709 // println!(
1710 // "pe_sub_offset_for_dist_index index {index} pe {pe} offset {}",
1711 // self.offset
1712 // );
1713 let start_pe = self.pe_for_dist_index(0)?;
1714
1715 match self.distribution {
1716 Distribution::Block => {
1717 if start_pe == pe {
1718 if index < self.size {
1719 Some(index)
1720 } else {
1721 None
1722 }
1723 } else {
1724 self.pe_full_offset_for_dist_index(pe, index)
1725 }
1726 }
1727 Distribution::Cyclic => {
1728 let num_pes = self.data.num_pes;
1729 if (index + self.offset) % num_pes == pe {
1730 Some(index / num_pes)
1731 } else {
1732 None
1733 }
1734 }
1735 }
1736 }
1737
1738 //index is local with respect to subarray
1739 //returns local offset relative to full array
1740 // //#[tracing::instrument(skip_all)]
1741 pub(crate) fn pe_full_offset_for_local_index(&self, pe: usize, index: usize) -> Option<usize> {
1742 let global_index = self.global_index_from_local(index)?;
1743 match self.distribution {
1744 Distribution::Block => {
1745 let pe_start_index = self.global_start_index_for_pe(pe);
1746 let mut pe_end_index = pe_start_index + self.orig_elem_per_pe;
1747 if pe < self.orig_remaining_elems {
1748 pe_end_index += 1;
1749 }
1750 if pe_start_index <= global_index && global_index < pe_end_index {
1751 Some(global_index - pe_start_index)
1752 } else {
1753 None
1754 }
1755 }
1756 Distribution::Cyclic => {
1757 let num_pes = self.data.num_pes;
1758 if global_index % num_pes == pe {
1759 Some(global_index / num_pes)
1760 } else {
1761 None
1762 }
1763 }
1764 }
1765 }
1766
1767 //index is local with respect to subarray
1768 //returns index with respect to original full length array
1769 // //#[tracing::instrument(skip_all)]
1770 pub(crate) fn global_index_from_local(&self, index: usize) -> Option<usize> {
1771 let my_pe = self.data.my_pe;
1772 match self.distribution {
1773 Distribution::Block => {
1774 let global_start = self.global_start_index_for_pe(my_pe);
1775 let start = global_start as isize - self.offset as isize;
1776 if start >= 0 {
1777 //the (sub)array starts before my pe
1778 if (start as usize) < self.size {
1779 //sub(array) exists on my node
1780 Some(global_start as usize + index)
1781 } else {
1782 //sub array does not exist on my node
1783 None
1784 }
1785 } else {
1786 //inner starts on or after my pe
1787 let mut global_end = global_start + self.orig_elem_per_pe;
1788 if my_pe < self.orig_remaining_elems {
1789 global_end += 1;
1790 }
1791 if self.offset < global_end {
1792 //the (sub)array starts on my pe
1793 Some(self.offset + index)
1794 } else {
1795 //the (sub)array starts after my pe
1796 None
1797 }
1798 }
1799 }
1800 Distribution::Cyclic => {
1801 let num_pes = self.data.num_pes;
1802 let start_pe = match self.pe_for_dist_index(0) {
1803 Some(i) => i,
1804 None => panic!("index 0 out of bounds for array of length {:?}", self.size),
1805 };
1806 let end_pe = match self.pe_for_dist_index(self.size - 1) {
1807 Some(i) => i,
1808 None => panic!(
1809 "index {:?} out of bounds for array of length {:?}",
1810 self.size - 1,
1811 self.size
1812 ),
1813 };
1814
1815 let mut num_elems = self.size / num_pes;
1816 // println!("{:?} {:?} {:?} {:?}",num_pes,start_pe,end_pe,num_elems);
1817
1818 if self.size % num_pes != 0 {
1819 //we have leftover elements
1820 if start_pe <= end_pe {
1821 //no wrap around occurs
1822 if start_pe <= my_pe && my_pe <= end_pe {
1823 num_elems += 1;
1824 }
1825 } else {
1826 //wrap around occurs
1827 if start_pe <= my_pe || my_pe <= end_pe {
1828 num_elems += 1;
1829 }
1830 }
1831 }
1832 // println!("{:?} {:?} {:?} {:?}",num_pes,start_pe,end_pe,num_elems);
1833
1834 if index < num_elems {
1835 if start_pe <= my_pe {
1836 Some(num_pes * index + self.offset + (my_pe - start_pe))
1837 } else {
1838 Some(num_pes * index + self.offset + (num_pes - start_pe) + my_pe)
1839 }
1840 } else {
1841 None
1842 }
1843 }
1844 }
1845 }
1846
1847 //index is local with respect to subarray
1848 //returns index with respect to subarrayy
1849 // //#[tracing::instrument(skip_all)]
1850 pub(crate) fn subarray_index_from_local(&self, index: usize) -> Option<usize> {
1851 let my_pe = self.data.my_pe;
1852 let my_start_index = self.start_index_for_pe(my_pe)?; //None means subarray doesnt exist on this PE
1853 match self.distribution {
1854 Distribution::Block => {
1855 if my_start_index + index < self.size {
1856 //local index is in subarray
1857 Some(my_start_index + index)
1858 } else {
1859 //local index outside subarray
1860 None
1861 }
1862 }
1863 Distribution::Cyclic => {
1864 let num_pes = self.data.num_pes;
1865 let num_elems_local = self.num_elems_local();
1866 if index < num_elems_local {
1867 //local index is in subarray
1868 Some(my_start_index + num_pes * index)
1869 } else {
1870 //local index outside subarray
1871 None
1872 }
1873 }
1874 }
1875 }
1876
1877 // return index relative to the full array
1878 pub(crate) fn global_start_index_for_pe(&self, pe: usize) -> usize {
1879 match self.distribution {
1880 Distribution::Block => {
1881 let global_start = self.orig_elem_per_pe * pe;
1882 global_start + std::cmp::min(pe, self.orig_remaining_elems)
1883 }
1884 Distribution::Cyclic => pe,
1885 }
1886 }
1887 //return index relative to the subarray
1888 // //#[tracing::instrument(skip_all)]
1889 pub(crate) fn start_index_for_pe(&self, pe: usize) -> Option<usize> {
1890 match self.distribution {
1891 Distribution::Block => {
1892 let global_start = self.global_start_index_for_pe(pe);
1893 let start = global_start as isize - self.offset as isize;
1894 if start >= 0 {
1895 //the (sub)array starts before my pe
1896 if (start as usize) < self.size {
1897 //sub(array) exists on my node
1898 Some(start as usize)
1899 } else {
1900 //sub array does not exist on my node
1901 None
1902 }
1903 } else {
1904 let mut global_end = global_start + self.orig_elem_per_pe;
1905 if pe < self.orig_remaining_elems {
1906 global_end += 1;
1907 }
1908 if self.offset < global_end {
1909 //the (sub)array starts on my pe
1910 Some(0)
1911 } else {
1912 //the (sub)array starts after my pe
1913 None
1914 }
1915 }
1916 }
1917 Distribution::Cyclic => {
1918 let num_pes = self.data.num_pes;
1919 if let Some(start_pe) = self.pe_for_dist_index(0) {
1920 let temp_len = if self.size < num_pes {
1921 //sub array might not exist on my array
1922 self.size
1923 } else {
1924 num_pes
1925 };
1926 for i in 0..temp_len {
1927 if (i + start_pe) % num_pes == pe {
1928 return Some(i);
1929 }
1930 }
1931 }
1932 None
1933 }
1934 }
1935 }
1936
1937 #[allow(dead_code)]
1938 pub(crate) fn global_end_index_for_pe(&self, pe: usize) -> usize {
1939 self.global_start_index_for_pe(pe) + self.num_elems_pe(pe)
1940 }
1941
1942 //return index relative to the subarray
1943 // //#[tracing::instrument(skip_all)]
1944 pub(crate) fn end_index_for_pe(&self, pe: usize) -> Option<usize> {
1945 let start_i = self.start_index_for_pe(pe)?;
1946 match self.distribution {
1947 Distribution::Block => {
1948 //(sub)array ends on our pe
1949 if pe == self.pe_for_dist_index(self.size - 1)? {
1950 Some(self.size - 1)
1951 } else {
1952 // (sub)array ends on another pe
1953 Some(self.start_index_for_pe(pe + 1)? - 1)
1954 }
1955 }
1956 Distribution::Cyclic => {
1957 let num_elems = self.num_elems_pe(pe);
1958 let num_pes = self.data.num_pes;
1959 let end_i = start_i + (num_elems - 1) * num_pes;
1960 Some(end_i)
1961 }
1962 }
1963 }
1964
1965 // //#[tracing::instrument(skip_all)]
1966 pub(crate) fn num_elems_pe(&self, pe: usize) -> usize {
1967 match self.distribution {
1968 Distribution::Block => {
1969 if let Some(start_i) = self.start_index_for_pe(pe) {
1970 //inner starts before or on pe
1971 let end_i = if let Some(end_i) = self.start_index_for_pe(pe + 1) {
1972 //inner ends after pe
1973 end_i
1974 } else {
1975 //inner ends on pe
1976 self.size
1977 };
1978 // println!("num_elems_pe pe {:?} si {:?} ei {:?}",pe,start_i,end_i);
1979 end_i - start_i
1980 } else {
1981 0
1982 }
1983 }
1984 Distribution::Cyclic => {
1985 let num_pes = self.data.num_pes;
1986 if let Some(start_pe) = self.pe_for_dist_index(0) {
1987 let end_pe = match self.pe_for_dist_index(self.size - 1) {
1988 Some(i) => i,
1989 None => panic!(
1990 "index {:?} out of bounds for array of length {:?}",
1991 self.size - 1,
1992 self.size
1993 ),
1994 }; //inclusive
1995 let mut num_elems = self.size / num_pes;
1996 if self.size % num_pes != 0 {
1997 //we have left over elements
1998 if start_pe <= end_pe {
1999 //no wrap around occurs
2000 if pe >= start_pe && pe <= end_pe {
2001 num_elems += 1
2002 }
2003 } else {
2004 //wrap arround occurs
2005 if pe >= start_pe || pe <= end_pe {
2006 num_elems += 1
2007 }
2008 }
2009 }
2010 num_elems
2011 } else {
2012 0
2013 }
2014 }
2015 }
2016 }
2017 // //#[tracing::instrument(skip_all)]
2018 pub(crate) fn num_elems_local(&self) -> usize {
2019 self.num_elems_pe(self.data.my_pe)
2020 }
2021
2022 // //#[tracing::instrument(skip_all)]
2023 pub(crate) unsafe fn local_as_mut_slice(&self) -> &mut [u8] {
2024 let slice =
2025 self.data.mem_region.as_casted_mut_slice::<u8>().expect(
2026 "memory doesnt exist on this pe (this should not happen for arrays currently)",
2027 );
2028 // let len = self.size;
2029 let my_pe = self.data.my_pe;
2030 let num_pes = self.data.num_pes;
2031 let num_elems_local = self.num_elems_local();
2032 match self.distribution {
2033 Distribution::Block => {
2034 let start_pe = self
2035 .pe_for_dist_index(0)
2036 .expect("array len should be greater than 0"); //index is relative to inner
2037 // let end_pe = self.pe_for_dist_index(len-1).unwrap();
2038 // println!("spe {:?} epe {:?}",start_pe,end_pe);
2039 let start_index = if my_pe == start_pe {
2040 //inner starts on my pe
2041 let global_start = self.global_start_index_for_pe(my_pe);
2042 self.offset - global_start
2043 } else {
2044 0
2045 };
2046 let end_index = start_index + num_elems_local;
2047 // println!(
2048 // "nel {:?} sao {:?} as slice si: {:?} ei {:?} elemsize {:?}",
2049 // num_elems_local, self.offset, start_index, end_index, self.elem_size
2050 // );
2051 &mut slice[start_index * self.elem_size..end_index * self.elem_size]
2052 }
2053 Distribution::Cyclic => {
2054 let global_index = self.offset;
2055 let start_index = global_index / num_pes
2056 + if my_pe >= global_index % num_pes {
2057 0
2058 } else {
2059 1
2060 };
2061 let end_index = start_index + num_elems_local;
2062 // println!("si {:?} ei {:?}",start_index,end_index);
2063 &mut slice[start_index * self.elem_size..end_index * self.elem_size]
2064 }
2065 }
2066 }
2067
2068 // //#[tracing::instrument(skip_all)]
2069 pub(crate) unsafe fn local_as_mut_ptr(&self) -> *mut u8 {
2070 let ptr =
2071 self.data.mem_region.as_casted_mut_ptr::<u8>().expect(
2072 "memory doesnt exist on this pe (this should not happen for arrays currently)",
2073 );
2074 // println!("u8 ptr: {:?}", ptr);
2075 // let len = self.size;
2076 let my_pe = self.data.my_pe;
2077 let num_pes = self.data.num_pes;
2078 // let num_elems_local = self.num_elems_local();
2079 match self.distribution {
2080 Distribution::Block => {
2081 let start_pe = self
2082 .pe_for_dist_index(0)
2083 .expect("array len should be greater than 0"); //index is relative to inner
2084 // let end_pe = self.pe_for_dist_index(len-1).unwrap();
2085 // println!("spe {:?} epe {:?}",start_pe,end_pe);
2086 let start_index = if my_pe == start_pe {
2087 //inner starts on my pe
2088 let global_start = self.global_start_index_for_pe(my_pe);
2089 self.offset - global_start
2090 } else {
2091 0
2092 };
2093
2094 // println!("nel {:?} sao {:?} as slice si: {:?} ei {:?}",num_elems_local,self.offset,start_index,end_index);
2095 ptr.offset((start_index * self.elem_size) as isize)
2096 }
2097 Distribution::Cyclic => {
2098 let global_index = self.offset;
2099 let start_index = global_index / num_pes
2100 + if my_pe >= global_index % num_pes {
2101 0
2102 } else {
2103 1
2104 };
2105 // println!("si {:?} ei {:?}",start_index,end_index);
2106 ptr.offset((start_index * self.elem_size) as isize)
2107 }
2108 }
2109 }
2110
2111 pub(crate) fn barrier_handle(&self) -> BarrierHandle {
2112 self.data.team.barrier.barrier_handle()
2113 }
2114}
2115
2116#[cfg(test)]
2117mod tests {
2118 #[test]
2119 fn pe_for_dist_index() {
2120 for num_pes in 2..200 {
2121 println!("num_pes {:?}", num_pes);
2122 for len in num_pes..2000 {
2123 let mut elems_per_pe = vec![0; num_pes];
2124 let mut pe_for_elem = vec![0; len];
2125 let epp = len as f32 / num_pes as f32;
2126 let mut cur_elem = 0;
2127 for pe in 0..num_pes {
2128 elems_per_pe[pe] =
2129 (((pe + 1) as f32 * epp).round() - (pe as f32 * epp).round()) as usize;
2130 for _i in 0..elems_per_pe[pe] {
2131 pe_for_elem[cur_elem] = pe;
2132 cur_elem += 1;
2133 }
2134 }
2135 for elem in 0..len {
2136 //the actual calculation
2137 let mut calc_pe = (((elem) as f32 / epp).floor()) as usize;
2138 let end_i = (epp * (calc_pe + 1) as f32).round() as usize;
2139 if elem >= end_i {
2140 calc_pe += 1;
2141 }
2142 //--------------------
2143 if calc_pe != pe_for_elem[elem] {
2144 println!(
2145 "npe: {:?} len {:?} e: {:?} eep: {:?} cpe: {:?} ei {:?}",
2146 num_pes,
2147 len,
2148 elem,
2149 epp,
2150 ((elem) as f32 / epp),
2151 end_i
2152 );
2153 println!("{:?}", elems_per_pe);
2154 println!("{:?}", pe_for_elem);
2155 }
2156 assert_eq!(calc_pe, pe_for_elem[elem]);
2157 }
2158 }
2159 }
2160 }
2161}