mcl_rs/
registered_buffer.rs

1#![cfg_attr(all(doc, CHANNEL_NIGHTLY), feature(doc_auto_cfg))]
2use crate::low_level;
3use crate::low_level::ArgOpt;
4use crate::task::{TaskArg, TaskArgData};
5
6use std::collections::{BTreeMap, Bound};
7use std::ffi::c_void;
8use std::ops::RangeBounds;
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11
12struct Alloc {
13    size: usize,
14    currently_allocated: BTreeMap<usize, usize>,
15}
16
17impl Alloc {
18    fn new(size: usize) -> Self {
19        Alloc {
20            size: size,
21            currently_allocated: BTreeMap::new(),
22        }
23    }
24
25    fn try_alloc(&mut self, start_i: usize, end_i: usize) -> Result<(), ()> {
26        if end_i > self.size {
27            panic!("out of bounds error on mcl buffer");
28        }
29        if self.currently_allocated.is_empty() {
30            //no other allocations yet
31            self.currently_allocated.insert(start_i, end_i);
32            // println!("{:?}",self.currently_allocated);
33            return Ok(());
34        } else {
35            //other allocations exist
36            if self.currently_allocated.contains_key(&start_i) {
37                //something already allocated from same index
38                Err(())
39            } else {
40                //no other allocations use the same start index, but I may exist with another allocation
41                let mut iter = self.currently_allocated.range(start_i..end_i);
42                if let Some(next_lowest) = iter.clone().rev().next() {
43                    // check the closest allocation that starts below me, or there are no allocations lower than me
44                    if start_i < *next_lowest.1 {
45                        // i start before this allocation ends
46                        return Err(());
47                    }
48                }
49                //at this point we know start_i doesnt exist in a previous allocation now check end_i
50                if let Some(_) = iter.next() {
51                    //we know some other allocation exists between start_i and end_i
52                    return Err(());
53                }
54                //woohoo we can allocate!
55                self.currently_allocated.insert(start_i, end_i);
56                // println!("{:?}",self.currently_allocated);
57                return Ok(());
58            }
59        }
60    }
61
62    fn free(&mut self, start_i: usize, end_i: usize) {
63        if let Some(v) = self.currently_allocated.remove(&start_i) {
64            assert_eq!(
65                v, end_i,
66                "unexepected subbuffer end index: {start_i} {v} {end_i}"
67            )
68        } else {
69            // println!("{:?}",self.currently_allocated);
70            panic!("unexepected subbuffer start index: {start_i}")
71        }
72    }
73}
74
75struct BufferMetaData {
76    orig_type_size: usize,
77    offset: usize,
78    len: usize,
79    cnt: Arc<AtomicUsize>,
80    my_alloc: Arc<Mutex<Alloc>>,
81    parent_alloc: Option<Arc<Mutex<Alloc>>>,
82    alloced: Arc<AtomicBool>,
83}
84
85/// Represents an MCL registered buffer, which is essentially a pointer
86/// to data which exists in device Resident memory. This allows multiple
87/// tasks to use the same buffer. Further, we support creating sub-buffers
88/// of a registered buffer to alleviate some  of the overhead associated with
89/// creating new buffers.
90///
91/// #Safety
92/// Given that RegisteredBuffers can be used by multiple task simultaneously, and
93/// that accelerators are often multi-threaded we need to ensure that RegisteredBuffers
94/// are safe with respect to read and write access.
95///
96/// Internal to each RegisteredBuffer, is an "allocator" which keeps track of sub buffers
97/// that have been created so that it is not possible to simultaneously have two subuffers
98/// that overlap one other and potentially modify the overlapping contents.
99///
100/// A task will delay executing, until it is able to "allocate" its subbuffer.
101///
102/// Currently this is a  mutally exclusive allocation, regardless of if the sub buffer
103/// only requires read access vs write access. This unfortunately serializes readonly tasks
104/// using overlapping regions of the RegisteredBuffer. We are working on a better allocator
105/// to relax this restriction.
106///
107/// Finally, RegisteredBuffers are reference counted objects, and will automatically free the aquired MCL resources
108/// once the last reference is dropped.
109///
110pub struct RegisteredBuffer<'a> {
111    data: TaskArg<'a>,
112    meta: BufferMetaData,
113}
114
115impl<'a> Clone for RegisteredBuffer<'a> {
116    fn clone(&self) -> Self {
117        self.meta.cnt.fetch_add(1, Ordering::SeqCst);
118        RegisteredBuffer {
119            data: self.data.clone(),
120            meta: BufferMetaData {
121                orig_type_size: self.meta.orig_type_size,
122                offset: self.meta.offset,
123                len: self.meta.len,
124                cnt: self.meta.cnt.clone(),
125                my_alloc: self.meta.my_alloc.clone(),
126                parent_alloc: self.meta.parent_alloc.clone(),
127                alloced: self.meta.alloced.clone(),
128            },
129        }
130    }
131}
132
133impl<'a> Drop for RegisteredBuffer<'a> {
134    fn drop(&mut self) {
135        // println!("dropping {} {} {}",self.meta.offset,self.meta.len,self.meta.cnt.load(Ordering::SeqCst));
136        if self.meta.cnt.fetch_sub(1, Ordering::SeqCst) == 1 {
137            match &self.data.data {
138                TaskArgData::Buffer(x) => low_level::unregister_buffer(x),
139                _ => unreachable!("Can not have a Registered Buffer that is not a buffer"),
140            }
141            if self.meta.alloced.load(Ordering::SeqCst) {
142                if let Some(p_alloc) = &self.meta.parent_alloc {
143                    //we are a sub array!
144                    p_alloc
145                        .lock()
146                        .unwrap()
147                        .free(self.meta.offset, self.meta.offset + self.meta.len);
148                } else {
149                    //we are not a sub array
150                    self.meta
151                        .my_alloc
152                        .lock()
153                        .unwrap()
154                        .free(self.meta.offset, self.meta.offset + self.meta.len);
155                }
156                self.meta.alloced.store(false, Ordering::SeqCst);
157            }
158        }
159    }
160}
161
162impl<'a> RegisteredBuffer<'a> {
163    pub(crate) fn new(data: TaskArg<'a>) -> Self {
164        let orig_type_size = data.orig_type_size;
165        let len = data.data.len();
166        // println!("dots: {orig_type_size} len: {len}");
167        match &data.data {
168            TaskArgData::Scalar(_) => panic!("cannot register a scalar"),
169            TaskArgData::Buffer(x) => low_level::register_buffer(x, data.flags),
170            // TaskArgData::Local(_) => panic!("Must register a buffer"),
171            #[cfg(feature = "shared_mem")]
172            TaskArgData::Shared(..) => {
173                panic!("Use shared_buffer the create/attach to a shared buffer")
174            }
175            TaskArgData::Empty => panic!("cannot have an empty arg"),
176        }
177        RegisteredBuffer {
178            data: data,
179            meta: BufferMetaData {
180                orig_type_size: orig_type_size,
181                offset: 0,
182                len: len,
183                cnt: Arc::new(AtomicUsize::new(1)),
184                my_alloc: Arc::new(Mutex::new(Alloc::new(len))),
185                parent_alloc: None,
186                alloced: Arc::new(AtomicBool::new(false)),
187            },
188        }
189    }
190
191    pub(crate) fn base_addr(&self) -> *mut c_void {
192        match &self.data.data {
193            TaskArgData::Buffer(x) => x.as_ptr() as *mut c_void,
194            _ => unreachable!("Can not have a Registered Buffer that is not a buffer"),
195        }
196    }
197
198    pub(crate) fn u8_offset(&self) -> i64 {
199        self.meta.offset as i64
200    }
201
202    pub(crate) fn u8_len(&self) -> u64 {
203        self.meta.len as u64
204    }
205
206    pub(crate) fn flags(&self) -> ArgOpt {
207        self.data.flags
208    }
209
210    ///Return to offset into the original RegisteredBuffer this handle starts at.
211    ///
212    /// # Examples
213    ///```no_run
214    ///     let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
215    ///     mcl.load_prog("my_path", mcl_rs::PrgType::Src);
216    ///     let mut a = vec![0;100];
217    ///     let buf = mcl.register_buffer(mcl_rs::TaskArg::inout_slice(&mut a)
218    ///            .resident(true)
219    ///            .dynamic(true),
220    ///     );
221    ///     assert_eq!(buf.offset(),0);
222    ///     let sub_buf = buf.sub_buffer(10..20);
223    ///     assert_eq!(sub_buf.offset(),10);
224    ///```
225    pub fn offset(&self) -> usize {
226        self.meta.offset / self.meta.orig_type_size
227    }
228
229    ///Return to len of the Registered(sub)Buffer
230    ///
231    /// # Examples
232    ///```no_run
233    ///     let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
234    ///     mcl.load_prog("my_path", mcl_rs::PrgType::Src);
235    ///     let mut a = vec![0;100];
236    ///     let buf = mcl.register_buffer(mcl_rs::TaskArg::inout_slice(&mut a)
237    ///            .resident(true)
238    ///            .dynamic(true),
239    ///     );
240    ///     assert_eq!(buf.len(),100);
241    ///     let sub_buf = buf.sub_buffer(10..20);
242    ///     assert_eq!(sub_buf.len(),10);
243    ///```
244    pub fn len(&self) -> usize {
245        self.meta.len / self.meta.orig_type_size
246    }
247
248    /// Tries to unregistered a previously registered buffer.
249    /// This will only succeed if this is the last reference to the registered buffer.
250    ///
251    /// # Examples
252    ///```no_run
253    ///     let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
254    ///     mcl.load_prog("my_path", mcl_rs::PrgType::Src);
255    ///     let mut a = vec![0;100];
256    ///     let buf = mcl.register_buffer(mcl_rs::TaskArg::inout_slice(&mut a)
257    ///            .resident(true)
258    ///            .dynamic(true),
259    ///     );
260    ///     let sub_buf = buf.sub_buffer(10..20);
261    ///     let buf = buf.try_unregister();
262    ///     assert!(buf.is_err());
263    ///     let buf = buf.unwrap_err();
264    ///     drop(sub_buf);
265    ///     assert!(buf.try_unregister().is_ok())
266    ///```
267    pub fn try_unregister(self) -> Result<(), Self> {
268        if let Ok(_) = self
269            .meta
270            .cnt
271            .compare_exchange(1, 0, Ordering::SeqCst, Ordering::SeqCst)
272        {
273            match &self.data.data {
274                TaskArgData::Buffer(x) => low_level::unregister_buffer(x),
275                _ => unreachable!("Can not have a Registered Buffer that is not a buffer"),
276            }
277            Ok(())
278        } else {
279            Err(self)
280        }
281    }
282
283    /// Tries to invalidate a previously registered buffer.
284    /// meaning that the data on the host has changed and needs to be recopied to the device (generally when the buffer is used for input);
285    /// This will only succeed if this is the last reference to the registered buffer.
286    ///
287    /// # Examples
288    ///```no_run
289    ///     let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
290    ///     mcl.load_prog("my_path", mcl_rs::PrgType::Src);
291    ///     let mut a = vec![0;100];
292    ///     let buf = mcl.register_buffer(mcl_rs::TaskArg::inout_slice(&mut a)
293    ///            .resident(true)
294    ///            .dynamic(true),
295    ///     );
296    ///     let sub_buf = buf.sub_buffer(10..20);
297    ///     assert!(!buf.try_invalidate());
298    ///     drop(sub_buf);
299    ///     assert!(buf.try_invalidate())
300    ///```
301    pub fn try_invalidate(&self) -> bool {
302        if self.meta.cnt.fetch_sub(1, Ordering::SeqCst) == 1 {
303            match &self.data.data {
304                TaskArgData::Buffer(x) => low_level::invalidate_buffer(x),
305                _ => unreachable!("Can not have a Registered Buffer that is not a buffer"),
306            }
307            true
308        } else {
309            false
310        }
311    }
312
313    /// Creates a sub buffer using the provided range from a given RegisteredBuffer
314    /// The sub buffer essentially "locks" the elements in the provided range
315    /// delaying other sub buffers executing with overlapping elements until all references to this sub buffer
316    /// has been dropped.
317    /// Note that sub buffer element locking happens at task execution time rather that sub buffer handle creation.
318    /// This allows overlapping sub buffers be created and passed as arguments to different tasks, with the dependecies
319    /// being handled automatically based on the submission and execution order of the tasks
320    ///
321    /// One can also create sub buffers of sub buffers.
322    ///
323    /// # Examples
324    ///```no_run
325    ///     let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
326    ///     mcl.load_prog("my_path", mcl_rs::PrgType::Src);
327    ///     let mut a = vec![0;100];
328    ///     let buf = mcl.register_buffer(mcl_rs::TaskArg::inout_slice(&mut a)
329    ///            .resident(true)
330    ///            .dynamic(true),
331    ///     );
332    ///     let sub_buf1 = buf.sub_buffer(10..20);
333    ///     let sub_buf2 = buf.sub_buffer(15..25); // this call will succeed even though it overlaps with sub_buf1
334    ///     let pes: [u64; 3] = [1, 1, 1];
335    ///     let task_1 = mcl.task("my_kernel", 1)
336    ///             .arg_buffer(sub_buf1)
337    ///             .dev(mcl_rs::DevType::CPU)
338    ///             .exec(pes);
339    ///     /// We can even create our next task sucessfully with the overlapping buffer because no actual work occurs until we call await
340    ///     let task_2 = mcl.task("my_kernel", 1)
341    ///             .arg_buffer(sub_buf2)
342    ///             .dev(mcl_rs::DevType::CPU)
343    ///             .exec(pes);
344    ///     // drive both futures simultaneously -- based on the overlapping dependency, these task will in reality be executed serially
345    ///     // as the internal implementation will prevent both tasks from allocating the overlapping sub_buffer regions simultaneously
346    ///     let task = futures::future::join_all([task_1,task_2]);
347    ///     futures::executor::block_on(task);
348    ///     
349    ///```
350    pub fn sub_buffer(&self, range: impl RangeBounds<usize>) -> Self {
351        let u8_start = match range.start_bound() {
352            Bound::Included(idx) => idx * self.meta.orig_type_size,
353            Bound::Excluded(idx) => (idx - 1) * self.meta.orig_type_size,
354            Bound::Unbounded => 0,
355        };
356
357        let u8_end = match range.end_bound() {
358            Bound::Included(idx) => (idx + 1) * self.meta.orig_type_size,
359            Bound::Excluded(idx) => idx * self.meta.orig_type_size,
360            Bound::Unbounded => self.meta.len,
361        };
362
363        let len = u8_end - u8_start;
364        let offset = self.meta.offset + u8_start;
365        // println!("{len} {offset} {u8_start} {u8_end}");
366        self.meta.cnt.fetch_add(1, Ordering::SeqCst);
367        RegisteredBuffer {
368            data: self.data.clone(),
369            meta: BufferMetaData {
370                orig_type_size: self.meta.orig_type_size,
371                offset: offset,
372                len: len,
373                cnt: self.meta.cnt.clone(),
374                my_alloc: Arc::new(Mutex::new(Alloc::new(self.data.data.len()))),
375                parent_alloc: Some(self.meta.my_alloc.clone()),
376                alloced: Arc::new(AtomicBool::new(false)),
377            },
378        }
379    }
380
381    async fn inner_alloc(&self, alloc: &Mutex<Alloc>) {
382        while !self.meta.alloced.load(Ordering::SeqCst) {
383            let mut alloc_guard = alloc.lock().unwrap();
384            if let Ok(_) = alloc_guard.try_alloc(self.meta.offset, self.meta.offset + self.meta.len)
385            {
386                self.meta.alloced.store(true, Ordering::SeqCst);
387            }
388            drop(alloc_guard);
389            async_std::task::yield_now().await;
390        }
391    }
392
393    pub(crate) async fn alloc(&self) {
394        if let Some(alloc) = self.meta.parent_alloc.as_ref() {
395            //we are a sub array!()
396            self.inner_alloc(alloc).await;
397        } else {
398            //we are not a sub array!
399            self.inner_alloc(&self.meta.my_alloc).await;
400        }
401    }
402}
403
404/// Represents an MCL shared buffer, which is essentially a pointer
405/// to data which exists in shared memory.
406/// When only the `shared_mem` feature is turned on this buffer will exist in host shared memory only.
407/// If instead the `pocl_extensions` feature is used, the the buffer will also exist in device shared memory.
408/// Note that `pocl_extensions` requires a patched version of POCL 1.8 to have been succesfully
409/// installed (please see <https://github.com/pnnl/mcl/tree/dev#using-custom-pocl-extensions> for more information).
410///
411/// A shared buffer allows tasks within different processes and applications to use the same buffer.
412/// Further, we support creating sub-buffers of a shared buffers to alleviate some  of the overhead associated with
413/// creating new buffers.
414///
415/// #Safety
416/// Given that Shared Buffers can be used by multiple processes simultaenously they should
417/// always be considered inherantly unsafe, as we are currently able to provide saftey gaurantees
418/// within a single process. Please see the discussion on saftey for [RegisteredBuffer] for details
419/// on the protections offered within a single process.
420/// Given that Shared Buffers can be used by multiple tasks and processes simultaneously, and
421/// that accelerators are often multi-threaded we try to ensure that RegisteredBuffers
422/// are safe with respect to read and write access.
423///
424/// While we are unable to enforce read/write saftey guarantees across processes, the MCL library
425/// does provide reference counting of the underlying shared memory buffer, and will release the
426/// resources once all references across all proceesses have been dropped.
427///
428#[cfg(any(feature = "shared_mem", feature = "pocl_extensions"))]
429pub struct SharedMemBuffer {
430    addr: *mut c_void,
431    size: usize,
432    flags: ArgOpt,
433    meta: BufferMetaData,
434}
435
436#[cfg(any(feature = "shared_mem", feature = "pocl_extensions"))]
437impl Clone for SharedMemBuffer {
438    fn clone(&self) -> Self {
439        self.meta.cnt.fetch_add(1, Ordering::SeqCst);
440        SharedMemBuffer {
441            addr: self.addr.clone(),
442            size: self.size,
443            flags: self.flags,
444            meta: BufferMetaData {
445                orig_type_size: self.meta.orig_type_size,
446                offset: self.meta.offset,
447                len: self.meta.len,
448                cnt: self.meta.cnt.clone(),
449                my_alloc: self.meta.my_alloc.clone(),
450                parent_alloc: self.meta.parent_alloc.clone(),
451                alloced: self.meta.alloced.clone(),
452            },
453        }
454    }
455}
456
457#[cfg(any(feature = "shared_mem", feature = "pocl_extensions"))]
458impl Drop for SharedMemBuffer {
459    fn drop(&mut self) {
460        // println!("dropping {} {} {}",self.meta.offset,self.meta.len,self.meta.cnt.load(Ordering::SeqCst));
461        if self.meta.cnt.fetch_sub(1, Ordering::SeqCst) == 1 {
462            low_level::detach_shared_buffer(self.addr);
463            if self.meta.alloced.load(Ordering::SeqCst) {
464                if let Some(p_alloc) = &self.meta.parent_alloc {
465                    //we are a sub array!
466                    p_alloc
467                        .lock()
468                        .unwrap()
469                        .free(self.meta.offset, self.meta.offset + self.meta.len);
470                } else {
471                    //we are not a sub array
472                    self.meta
473                        .my_alloc
474                        .lock()
475                        .unwrap()
476                        .free(self.meta.offset, self.meta.offset + self.meta.len);
477                }
478                self.meta.alloced.store(false, Ordering::SeqCst);
479            }
480        }
481    }
482}
483
484#[cfg(any(feature = "shared_mem", feature = "pocl_extensions"))]
485impl SharedMemBuffer {
486    pub(crate) fn new(data: TaskArg<'_>) -> Self {
487        let orig_type_size = data.orig_type_size;
488        let (addr, len) = match &data.data {
489            TaskArgData::Scalar(_) => panic!("cannot share a scalar"),
490            TaskArgData::Buffer(_) => panic!("use the TaskArg::*_shared apis instead"),
491            TaskArgData::Local(_) => panic!("cannot not share a local buffer"),
492            TaskArgData::Shared(name, size) => {
493                (low_level::get_shared_buffer(name, *size, data.flags), *size)
494            }
495            TaskArgData::Empty => panic!("cannot have an empty arg"),
496        };
497
498        // println!("SharedMemBuffer size {len}");
499
500        SharedMemBuffer {
501            addr: addr,
502            size: len,
503            flags: data.flags,
504            meta: BufferMetaData {
505                orig_type_size: orig_type_size,
506                offset: 0,
507                len: len,
508                cnt: Arc::new(AtomicUsize::new(1)),
509                my_alloc: Arc::new(Mutex::new(Alloc::new(len))),
510                parent_alloc: None,
511                alloced: Arc::new(AtomicBool::new(false)),
512            },
513        }
514    }
515
516    pub(crate) fn base_addr(&self) -> *mut c_void {
517        self.addr
518    }
519
520    pub(crate) fn u8_offset(&self) -> i64 {
521        self.meta.offset as i64
522    }
523
524    pub(crate) fn u8_len(&self) -> u64 {
525        self.meta.len as u64
526    }
527
528    pub(crate) fn flags(&self) -> ArgOpt {
529        self.flags
530    }
531
532    ///Return the offset into the original SharedMemBuffer this handle starts at.
533    ///
534    /// # Examples
535    ///```no_run
536    ///     let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
537    ///     mcl.load_prog("my_path", mcl_rs::PrgType::Src);
538    ///     let num_elems = 100;
539    ///     let buf = mcl.create_shared_buffer(mcl_rs::TaskArg::inout_shared::<u32>("my_buffer", num_elems));
540    ///     assert_eq!(buf.offset(),0);
541    ///     let sub_buf = buf.sub_buffer(10..20);
542    ///     assert_eq!(sub_buf.offset(),10);
543    ///```
544    pub fn offset(&self) -> usize {
545        self.meta.offset / self.meta.orig_type_size
546    }
547
548    ///Return the len of this (sub)-SharedMemBuffer
549    ///
550    /// # Examples
551    ///```no_run
552    ///     let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
553    ///     mcl.load_prog("my_path", mcl_rs::PrgType::Src);
554    ///     let num_elems = 100;
555    ///     let buf = mcl.create_shared_buffer(mcl_rs::TaskArg::inout_shared::<u32>("my_buffer", num_elems));
556    ///     assert_eq!(buf.len(),100);
557    ///     let sub_buf = buf.sub_buffer(10..20);
558    ///     assert_eq!(sub_buf.len(),10);
559    ///```
560    pub fn len(&self) -> usize {
561        self.meta.len / self.meta.orig_type_size
562    }
563
564    /// Try to manually detach from this shared memory segment (i.e. decrement the global buffer reference count), this will only succeed if this is the last reference locally
565    ///
566    /// NOTE: Dropping a handle potentially calls this automatically provided it is the last local (to this process) reference to the buffer.
567    ///
568    /// # Examples
569    ///```no_run
570    ///     let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
571    ///     mcl.load_prog("my_path", mcl_rs::PrgType::Src);
572    ///
573    ///     let num_elems = 100;
574    ///     let buf = mcl.create_shared_buffer(mcl_rs::TaskArg::inout_shared::<u32>("my_buffer", num_elems));
575    ///     let sub_buf = buf.sub_buffer(10..20);
576    ///
577    ///     let buf = buf.try_detach();
578    ///     assert!(buf.is_err());
579    ///     let buf = buf.unwrap_err();
580    ///     drop(sub_buf);
581    ///     assert!(buf.try_detach().is_ok())
582    ///```
583    pub fn try_detach(self) -> Result<(), Self> {
584        if self.meta.cnt.fetch_sub(1, Ordering::SeqCst) == 1 {
585            low_level::detach_shared_buffer(self.addr);
586            Ok(())
587        } else {
588            Err(self)
589        }
590    }
591
592    /// Creates a sub buffer using the provided range from a given SharedMemBuffer
593    /// The sub buffer essentially "locks" the elements in the provided range
594    /// *BUT ONLY ON THE CALLING PROCESS* (other processes will have no idea of these locked regions)
595    /// delaying other (local to this process) sub buffers from executing with overlapping elements until all references to this sub buffer
596    /// have been dropped.
597    /// Note that sub buffer element locking happens at task execution time rather that sub buffer handle creation.
598    /// This allows overlapping sub buffers be created and passed as arguments to different tasks, with the dependecies
599    /// being handled automatically based on the submission and execution order of the tasks
600    ///
601    /// One can also create sub buffers of sub buffers.
602    ///
603    /// # Examples
604    ///```no_run
605    ///     let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
606    ///     mcl.load_prog("my_path", mcl_rs::PrgType::Src);
607    ///
608    ///     let num_elems = 100;
609    ///     let buf = mcl.create_shared_buffer(mcl_rs::TaskArg::inout_shared::<u32>("my_buffer", num_elems));
610    ///     let sub_buf1 = buf.sub_buffer(10..20);
611    ///     let sub_buf2 = buf.sub_buffer(15..25); // this call will succeed even though it overlaps with sub_buf1
612    ///     let tasks = async move {
613    ///         let pes: [u64; 3] = [1, 1, 1]
614    ///         let task_1 = mcl.task("my_kernel", 1)
615    ///                 .arg_buffer(mcl_rs::TaskArg::output_slice(sub_buf1))
616    ///                 .dev(mcl_rs::DevType::CPU)
617    ///                 .exec(pes);
618    ///         let task_1 = mcl.task("my_kernel", 1)
619    ///                 .arg_buffer(mcl_rs::TaskArg::output_slice(sub_buf1))
620    ///                 .dev(mcl_rs::DevType::CPU)
621    ///                 .exec(pes);
622    ///         /// We can even create our next task sucessfully with the overlapping buffer because no actual work occurs until we call await
623    ///         let task_2 = mcl.task("my_kernel", 1)
624    ///                 .arg_buffer(mcl_rs::TaskArg::output_slice(sub_buf1))
625    ///                 .dev(mcl_rs::DevType::CPU)
626    ///                 .exec(pes);
627    ///     }
628    ///     // drive both futures simultaneously -- based on the overlapping dependency, these task will in reality be executed serially
629    ///     // as the internal implementation will prevent both tasks from allocating the overlapping sub_buffer regions simultaneously
630    ///     futures::future::join_all([task_1,task_2]);
631    ///     futures::executor::block_on(task);
632    ///     
633    ///```
634    pub fn sub_buffer(&self, range: impl RangeBounds<usize>) -> Self {
635        let u8_start = match range.start_bound() {
636            Bound::Included(idx) => idx * self.meta.orig_type_size,
637            Bound::Excluded(idx) => (idx - 1) * self.meta.orig_type_size,
638            Bound::Unbounded => 0,
639        };
640
641        let u8_end = match range.end_bound() {
642            Bound::Included(idx) => (idx + 1) * self.meta.orig_type_size,
643            Bound::Excluded(idx) => idx * self.meta.orig_type_size,
644            Bound::Unbounded => self.meta.len,
645        };
646
647        let len = u8_end - u8_start;
648        let offset = self.meta.offset + u8_start;
649        // println!("{len} {offset} {u8_start} {u8_end}");
650        self.meta.cnt.fetch_add(1, Ordering::SeqCst);
651        SharedMemBuffer {
652            addr: self.addr.clone(),
653            size: self.size,
654            flags: self.flags.clone(),
655            meta: BufferMetaData {
656                orig_type_size: self.meta.orig_type_size,
657                offset: offset,
658                len: len,
659                cnt: self.meta.cnt.clone(),
660                my_alloc: Arc::new(Mutex::new(Alloc::new(self.size))),
661                parent_alloc: Some(self.meta.my_alloc.clone()),
662                alloced: Arc::new(AtomicBool::new(false)),
663            },
664        }
665    }
666
667    /// Extract a T slice from this SharedMemBuffer handle.
668    ///
669    /// #Saftey
670    /// This is unsafe as we currently have no mechanism to guarantee the alignment of T with the alignment used to originally create the buffer
671    /// potentially in a different process. The user must ensure the alignment is valid otherwise behavior is undefined.
672    ///
673    /// # Examples
674    ///```no_run
675    ///     let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
676    ///     mcl.load_prog("my_path", mcl_rs::PrgType::Src);
677    ///
678    ///     let num_elems = 100;
679    ///     let buf = mcl.attach_shared_buffer(mcl_rs::TaskArg::inout_shared::<u32>("my_buffer", num_elems));
680    ///     let sliced = unsafe { buf.as_slice::<u32>()};
681    ///```
682    pub unsafe fn as_slice<T>(&self) -> &[T] {
683        assert_eq!(self.meta.len % std::mem::size_of::<T>(),0, "Leftover bytes when tryin to create slice i.e. (buffer len in bytes) % (size of T) != 0");
684        std::slice::from_raw_parts(
685            self.addr as *const T,
686            self.meta.len / std::mem::size_of::<T>(),
687        )
688    }
689
690    /// Extract a T slice from this SharedMemBuffer handle.
691    ///
692    /// #Saftey
693    /// This is unsafe as we currently have no mechanism to guarantee the alignment of T with the alignment used to originally create the buffer
694    /// potentially in a different process. The user must ensure the alignment is valid otherwise behavior is undefined.
695    ///
696    /// # Examples
697    ///```no_run
698    ///     let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
699    ///     mcl.load_prog("my_path", mcl_rs::PrgType::Src);
700    ///
701    ///     let num_elems = 100;
702    ///     let buf = mcl.attach_shared_buffer(mcl_rs::TaskArg::inout_shared::<u32>("my_buffer", num_elems));
703    ///     let slice = unsafe { buf.as_mut_slice::<u32>()};
704    ///```
705    pub unsafe fn as_mut_slice<T>(&self) -> &mut [T] {
706        assert_eq!(self.meta.len % std::mem::size_of::<T>(),0, "Leftover bytes when tryin to create slice i.e. (buffer len in bytes) % (size of T) != 0");
707        std::slice::from_raw_parts_mut(self.addr as *mut T, self.meta.len)
708    }
709
710    async fn inner_alloc(&self, alloc: &Mutex<Alloc>) {
711        while !self.meta.alloced.load(Ordering::SeqCst) {
712            let mut alloc_guard = alloc.lock().unwrap();
713            if let Ok(_) = alloc_guard.try_alloc(self.meta.offset, self.meta.offset + self.meta.len)
714            {
715                self.meta.alloced.store(true, Ordering::SeqCst);
716            }
717            drop(alloc_guard);
718            async_std::task::yield_now().await;
719        }
720    }
721
722    pub(crate) async fn alloc(&self) {
723        if let Some(alloc) = self.meta.parent_alloc.as_ref() {
724            //we are a sub array!()
725            self.inner_alloc(alloc).await;
726        } else {
727            //we are not a sub array!
728            self.inner_alloc(&self.meta.my_alloc).await;
729        }
730    }
731}