mcl_rs/registered_buffer.rs
1#![cfg_attr(all(doc, CHANNEL_NIGHTLY), feature(doc_auto_cfg))]
2use crate::low_level;
3use crate::low_level::ArgOpt;
4use crate::task::{TaskArg, TaskArgData};
5
6use std::collections::{BTreeMap, Bound};
7use std::ffi::c_void;
8use std::ops::RangeBounds;
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11
12struct Alloc {
13 size: usize,
14 currently_allocated: BTreeMap<usize, usize>,
15}
16
17impl Alloc {
18 fn new(size: usize) -> Self {
19 Alloc {
20 size: size,
21 currently_allocated: BTreeMap::new(),
22 }
23 }
24
25 fn try_alloc(&mut self, start_i: usize, end_i: usize) -> Result<(), ()> {
26 if end_i > self.size {
27 panic!("out of bounds error on mcl buffer");
28 }
29 if self.currently_allocated.is_empty() {
30 //no other allocations yet
31 self.currently_allocated.insert(start_i, end_i);
32 // println!("{:?}",self.currently_allocated);
33 return Ok(());
34 } else {
35 //other allocations exist
36 if self.currently_allocated.contains_key(&start_i) {
37 //something already allocated from same index
38 Err(())
39 } else {
40 //no other allocations use the same start index, but I may exist with another allocation
41 let mut iter = self.currently_allocated.range(start_i..end_i);
42 if let Some(next_lowest) = iter.clone().rev().next() {
43 // check the closest allocation that starts below me, or there are no allocations lower than me
44 if start_i < *next_lowest.1 {
45 // i start before this allocation ends
46 return Err(());
47 }
48 }
49 //at this point we know start_i doesnt exist in a previous allocation now check end_i
50 if let Some(_) = iter.next() {
51 //we know some other allocation exists between start_i and end_i
52 return Err(());
53 }
54 //woohoo we can allocate!
55 self.currently_allocated.insert(start_i, end_i);
56 // println!("{:?}",self.currently_allocated);
57 return Ok(());
58 }
59 }
60 }
61
62 fn free(&mut self, start_i: usize, end_i: usize) {
63 if let Some(v) = self.currently_allocated.remove(&start_i) {
64 assert_eq!(
65 v, end_i,
66 "unexepected subbuffer end index: {start_i} {v} {end_i}"
67 )
68 } else {
69 // println!("{:?}",self.currently_allocated);
70 panic!("unexepected subbuffer start index: {start_i}")
71 }
72 }
73}
74
75struct BufferMetaData {
76 orig_type_size: usize,
77 offset: usize,
78 len: usize,
79 cnt: Arc<AtomicUsize>,
80 my_alloc: Arc<Mutex<Alloc>>,
81 parent_alloc: Option<Arc<Mutex<Alloc>>>,
82 alloced: Arc<AtomicBool>,
83}
84
85/// Represents an MCL registered buffer, which is essentially a pointer
86/// to data which exists in device Resident memory. This allows multiple
87/// tasks to use the same buffer. Further, we support creating sub-buffers
88/// of a registered buffer to alleviate some of the overhead associated with
89/// creating new buffers.
90///
91/// #Safety
92/// Given that RegisteredBuffers can be used by multiple task simultaneously, and
93/// that accelerators are often multi-threaded we need to ensure that RegisteredBuffers
94/// are safe with respect to read and write access.
95///
96/// Internal to each RegisteredBuffer, is an "allocator" which keeps track of sub buffers
97/// that have been created so that it is not possible to simultaneously have two subuffers
98/// that overlap one other and potentially modify the overlapping contents.
99///
100/// A task will delay executing, until it is able to "allocate" its subbuffer.
101///
102/// Currently this is a mutally exclusive allocation, regardless of if the sub buffer
103/// only requires read access vs write access. This unfortunately serializes readonly tasks
104/// using overlapping regions of the RegisteredBuffer. We are working on a better allocator
105/// to relax this restriction.
106///
107/// Finally, RegisteredBuffers are reference counted objects, and will automatically free the aquired MCL resources
108/// once the last reference is dropped.
109///
110pub struct RegisteredBuffer<'a> {
111 data: TaskArg<'a>,
112 meta: BufferMetaData,
113}
114
115impl<'a> Clone for RegisteredBuffer<'a> {
116 fn clone(&self) -> Self {
117 self.meta.cnt.fetch_add(1, Ordering::SeqCst);
118 RegisteredBuffer {
119 data: self.data.clone(),
120 meta: BufferMetaData {
121 orig_type_size: self.meta.orig_type_size,
122 offset: self.meta.offset,
123 len: self.meta.len,
124 cnt: self.meta.cnt.clone(),
125 my_alloc: self.meta.my_alloc.clone(),
126 parent_alloc: self.meta.parent_alloc.clone(),
127 alloced: self.meta.alloced.clone(),
128 },
129 }
130 }
131}
132
133impl<'a> Drop for RegisteredBuffer<'a> {
134 fn drop(&mut self) {
135 // println!("dropping {} {} {}",self.meta.offset,self.meta.len,self.meta.cnt.load(Ordering::SeqCst));
136 if self.meta.cnt.fetch_sub(1, Ordering::SeqCst) == 1 {
137 match &self.data.data {
138 TaskArgData::Buffer(x) => low_level::unregister_buffer(x),
139 _ => unreachable!("Can not have a Registered Buffer that is not a buffer"),
140 }
141 if self.meta.alloced.load(Ordering::SeqCst) {
142 if let Some(p_alloc) = &self.meta.parent_alloc {
143 //we are a sub array!
144 p_alloc
145 .lock()
146 .unwrap()
147 .free(self.meta.offset, self.meta.offset + self.meta.len);
148 } else {
149 //we are not a sub array
150 self.meta
151 .my_alloc
152 .lock()
153 .unwrap()
154 .free(self.meta.offset, self.meta.offset + self.meta.len);
155 }
156 self.meta.alloced.store(false, Ordering::SeqCst);
157 }
158 }
159 }
160}
161
162impl<'a> RegisteredBuffer<'a> {
163 pub(crate) fn new(data: TaskArg<'a>) -> Self {
164 let orig_type_size = data.orig_type_size;
165 let len = data.data.len();
166 // println!("dots: {orig_type_size} len: {len}");
167 match &data.data {
168 TaskArgData::Scalar(_) => panic!("cannot register a scalar"),
169 TaskArgData::Buffer(x) => low_level::register_buffer(x, data.flags),
170 // TaskArgData::Local(_) => panic!("Must register a buffer"),
171 #[cfg(feature = "shared_mem")]
172 TaskArgData::Shared(..) => {
173 panic!("Use shared_buffer the create/attach to a shared buffer")
174 }
175 TaskArgData::Empty => panic!("cannot have an empty arg"),
176 }
177 RegisteredBuffer {
178 data: data,
179 meta: BufferMetaData {
180 orig_type_size: orig_type_size,
181 offset: 0,
182 len: len,
183 cnt: Arc::new(AtomicUsize::new(1)),
184 my_alloc: Arc::new(Mutex::new(Alloc::new(len))),
185 parent_alloc: None,
186 alloced: Arc::new(AtomicBool::new(false)),
187 },
188 }
189 }
190
191 pub(crate) fn base_addr(&self) -> *mut c_void {
192 match &self.data.data {
193 TaskArgData::Buffer(x) => x.as_ptr() as *mut c_void,
194 _ => unreachable!("Can not have a Registered Buffer that is not a buffer"),
195 }
196 }
197
198 pub(crate) fn u8_offset(&self) -> i64 {
199 self.meta.offset as i64
200 }
201
202 pub(crate) fn u8_len(&self) -> u64 {
203 self.meta.len as u64
204 }
205
206 pub(crate) fn flags(&self) -> ArgOpt {
207 self.data.flags
208 }
209
210 ///Return to offset into the original RegisteredBuffer this handle starts at.
211 ///
212 /// # Examples
213 ///```no_run
214 /// let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
215 /// mcl.load_prog("my_path", mcl_rs::PrgType::Src);
216 /// let mut a = vec![0;100];
217 /// let buf = mcl.register_buffer(mcl_rs::TaskArg::inout_slice(&mut a)
218 /// .resident(true)
219 /// .dynamic(true),
220 /// );
221 /// assert_eq!(buf.offset(),0);
222 /// let sub_buf = buf.sub_buffer(10..20);
223 /// assert_eq!(sub_buf.offset(),10);
224 ///```
225 pub fn offset(&self) -> usize {
226 self.meta.offset / self.meta.orig_type_size
227 }
228
229 ///Return to len of the Registered(sub)Buffer
230 ///
231 /// # Examples
232 ///```no_run
233 /// let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
234 /// mcl.load_prog("my_path", mcl_rs::PrgType::Src);
235 /// let mut a = vec![0;100];
236 /// let buf = mcl.register_buffer(mcl_rs::TaskArg::inout_slice(&mut a)
237 /// .resident(true)
238 /// .dynamic(true),
239 /// );
240 /// assert_eq!(buf.len(),100);
241 /// let sub_buf = buf.sub_buffer(10..20);
242 /// assert_eq!(sub_buf.len(),10);
243 ///```
244 pub fn len(&self) -> usize {
245 self.meta.len / self.meta.orig_type_size
246 }
247
248 /// Tries to unregistered a previously registered buffer.
249 /// This will only succeed if this is the last reference to the registered buffer.
250 ///
251 /// # Examples
252 ///```no_run
253 /// let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
254 /// mcl.load_prog("my_path", mcl_rs::PrgType::Src);
255 /// let mut a = vec![0;100];
256 /// let buf = mcl.register_buffer(mcl_rs::TaskArg::inout_slice(&mut a)
257 /// .resident(true)
258 /// .dynamic(true),
259 /// );
260 /// let sub_buf = buf.sub_buffer(10..20);
261 /// let buf = buf.try_unregister();
262 /// assert!(buf.is_err());
263 /// let buf = buf.unwrap_err();
264 /// drop(sub_buf);
265 /// assert!(buf.try_unregister().is_ok())
266 ///```
267 pub fn try_unregister(self) -> Result<(), Self> {
268 if let Ok(_) = self
269 .meta
270 .cnt
271 .compare_exchange(1, 0, Ordering::SeqCst, Ordering::SeqCst)
272 {
273 match &self.data.data {
274 TaskArgData::Buffer(x) => low_level::unregister_buffer(x),
275 _ => unreachable!("Can not have a Registered Buffer that is not a buffer"),
276 }
277 Ok(())
278 } else {
279 Err(self)
280 }
281 }
282
283 /// Tries to invalidate a previously registered buffer.
284 /// meaning that the data on the host has changed and needs to be recopied to the device (generally when the buffer is used for input);
285 /// This will only succeed if this is the last reference to the registered buffer.
286 ///
287 /// # Examples
288 ///```no_run
289 /// let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
290 /// mcl.load_prog("my_path", mcl_rs::PrgType::Src);
291 /// let mut a = vec![0;100];
292 /// let buf = mcl.register_buffer(mcl_rs::TaskArg::inout_slice(&mut a)
293 /// .resident(true)
294 /// .dynamic(true),
295 /// );
296 /// let sub_buf = buf.sub_buffer(10..20);
297 /// assert!(!buf.try_invalidate());
298 /// drop(sub_buf);
299 /// assert!(buf.try_invalidate())
300 ///```
301 pub fn try_invalidate(&self) -> bool {
302 if self.meta.cnt.fetch_sub(1, Ordering::SeqCst) == 1 {
303 match &self.data.data {
304 TaskArgData::Buffer(x) => low_level::invalidate_buffer(x),
305 _ => unreachable!("Can not have a Registered Buffer that is not a buffer"),
306 }
307 true
308 } else {
309 false
310 }
311 }
312
313 /// Creates a sub buffer using the provided range from a given RegisteredBuffer
314 /// The sub buffer essentially "locks" the elements in the provided range
315 /// delaying other sub buffers executing with overlapping elements until all references to this sub buffer
316 /// has been dropped.
317 /// Note that sub buffer element locking happens at task execution time rather that sub buffer handle creation.
318 /// This allows overlapping sub buffers be created and passed as arguments to different tasks, with the dependecies
319 /// being handled automatically based on the submission and execution order of the tasks
320 ///
321 /// One can also create sub buffers of sub buffers.
322 ///
323 /// # Examples
324 ///```no_run
325 /// let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
326 /// mcl.load_prog("my_path", mcl_rs::PrgType::Src);
327 /// let mut a = vec![0;100];
328 /// let buf = mcl.register_buffer(mcl_rs::TaskArg::inout_slice(&mut a)
329 /// .resident(true)
330 /// .dynamic(true),
331 /// );
332 /// let sub_buf1 = buf.sub_buffer(10..20);
333 /// let sub_buf2 = buf.sub_buffer(15..25); // this call will succeed even though it overlaps with sub_buf1
334 /// let pes: [u64; 3] = [1, 1, 1];
335 /// let task_1 = mcl.task("my_kernel", 1)
336 /// .arg_buffer(sub_buf1)
337 /// .dev(mcl_rs::DevType::CPU)
338 /// .exec(pes);
339 /// /// We can even create our next task sucessfully with the overlapping buffer because no actual work occurs until we call await
340 /// let task_2 = mcl.task("my_kernel", 1)
341 /// .arg_buffer(sub_buf2)
342 /// .dev(mcl_rs::DevType::CPU)
343 /// .exec(pes);
344 /// // drive both futures simultaneously -- based on the overlapping dependency, these task will in reality be executed serially
345 /// // as the internal implementation will prevent both tasks from allocating the overlapping sub_buffer regions simultaneously
346 /// let task = futures::future::join_all([task_1,task_2]);
347 /// futures::executor::block_on(task);
348 ///
349 ///```
350 pub fn sub_buffer(&self, range: impl RangeBounds<usize>) -> Self {
351 let u8_start = match range.start_bound() {
352 Bound::Included(idx) => idx * self.meta.orig_type_size,
353 Bound::Excluded(idx) => (idx - 1) * self.meta.orig_type_size,
354 Bound::Unbounded => 0,
355 };
356
357 let u8_end = match range.end_bound() {
358 Bound::Included(idx) => (idx + 1) * self.meta.orig_type_size,
359 Bound::Excluded(idx) => idx * self.meta.orig_type_size,
360 Bound::Unbounded => self.meta.len,
361 };
362
363 let len = u8_end - u8_start;
364 let offset = self.meta.offset + u8_start;
365 // println!("{len} {offset} {u8_start} {u8_end}");
366 self.meta.cnt.fetch_add(1, Ordering::SeqCst);
367 RegisteredBuffer {
368 data: self.data.clone(),
369 meta: BufferMetaData {
370 orig_type_size: self.meta.orig_type_size,
371 offset: offset,
372 len: len,
373 cnt: self.meta.cnt.clone(),
374 my_alloc: Arc::new(Mutex::new(Alloc::new(self.data.data.len()))),
375 parent_alloc: Some(self.meta.my_alloc.clone()),
376 alloced: Arc::new(AtomicBool::new(false)),
377 },
378 }
379 }
380
381 async fn inner_alloc(&self, alloc: &Mutex<Alloc>) {
382 while !self.meta.alloced.load(Ordering::SeqCst) {
383 let mut alloc_guard = alloc.lock().unwrap();
384 if let Ok(_) = alloc_guard.try_alloc(self.meta.offset, self.meta.offset + self.meta.len)
385 {
386 self.meta.alloced.store(true, Ordering::SeqCst);
387 }
388 drop(alloc_guard);
389 async_std::task::yield_now().await;
390 }
391 }
392
393 pub(crate) async fn alloc(&self) {
394 if let Some(alloc) = self.meta.parent_alloc.as_ref() {
395 //we are a sub array!()
396 self.inner_alloc(alloc).await;
397 } else {
398 //we are not a sub array!
399 self.inner_alloc(&self.meta.my_alloc).await;
400 }
401 }
402}
403
404/// Represents an MCL shared buffer, which is essentially a pointer
405/// to data which exists in shared memory.
406/// When only the `shared_mem` feature is turned on this buffer will exist in host shared memory only.
407/// If instead the `pocl_extensions` feature is used, the the buffer will also exist in device shared memory.
408/// Note that `pocl_extensions` requires a patched version of POCL 1.8 to have been succesfully
409/// installed (please see <https://github.com/pnnl/mcl/tree/dev#using-custom-pocl-extensions> for more information).
410///
411/// A shared buffer allows tasks within different processes and applications to use the same buffer.
412/// Further, we support creating sub-buffers of a shared buffers to alleviate some of the overhead associated with
413/// creating new buffers.
414///
415/// #Safety
416/// Given that Shared Buffers can be used by multiple processes simultaenously they should
417/// always be considered inherantly unsafe, as we are currently able to provide saftey gaurantees
418/// within a single process. Please see the discussion on saftey for [RegisteredBuffer] for details
419/// on the protections offered within a single process.
420/// Given that Shared Buffers can be used by multiple tasks and processes simultaneously, and
421/// that accelerators are often multi-threaded we try to ensure that RegisteredBuffers
422/// are safe with respect to read and write access.
423///
424/// While we are unable to enforce read/write saftey guarantees across processes, the MCL library
425/// does provide reference counting of the underlying shared memory buffer, and will release the
426/// resources once all references across all proceesses have been dropped.
427///
428#[cfg(any(feature = "shared_mem", feature = "pocl_extensions"))]
429pub struct SharedMemBuffer {
430 addr: *mut c_void,
431 size: usize,
432 flags: ArgOpt,
433 meta: BufferMetaData,
434}
435
436#[cfg(any(feature = "shared_mem", feature = "pocl_extensions"))]
437impl Clone for SharedMemBuffer {
438 fn clone(&self) -> Self {
439 self.meta.cnt.fetch_add(1, Ordering::SeqCst);
440 SharedMemBuffer {
441 addr: self.addr.clone(),
442 size: self.size,
443 flags: self.flags,
444 meta: BufferMetaData {
445 orig_type_size: self.meta.orig_type_size,
446 offset: self.meta.offset,
447 len: self.meta.len,
448 cnt: self.meta.cnt.clone(),
449 my_alloc: self.meta.my_alloc.clone(),
450 parent_alloc: self.meta.parent_alloc.clone(),
451 alloced: self.meta.alloced.clone(),
452 },
453 }
454 }
455}
456
457#[cfg(any(feature = "shared_mem", feature = "pocl_extensions"))]
458impl Drop for SharedMemBuffer {
459 fn drop(&mut self) {
460 // println!("dropping {} {} {}",self.meta.offset,self.meta.len,self.meta.cnt.load(Ordering::SeqCst));
461 if self.meta.cnt.fetch_sub(1, Ordering::SeqCst) == 1 {
462 low_level::detach_shared_buffer(self.addr);
463 if self.meta.alloced.load(Ordering::SeqCst) {
464 if let Some(p_alloc) = &self.meta.parent_alloc {
465 //we are a sub array!
466 p_alloc
467 .lock()
468 .unwrap()
469 .free(self.meta.offset, self.meta.offset + self.meta.len);
470 } else {
471 //we are not a sub array
472 self.meta
473 .my_alloc
474 .lock()
475 .unwrap()
476 .free(self.meta.offset, self.meta.offset + self.meta.len);
477 }
478 self.meta.alloced.store(false, Ordering::SeqCst);
479 }
480 }
481 }
482}
483
484#[cfg(any(feature = "shared_mem", feature = "pocl_extensions"))]
485impl SharedMemBuffer {
486 pub(crate) fn new(data: TaskArg<'_>) -> Self {
487 let orig_type_size = data.orig_type_size;
488 let (addr, len) = match &data.data {
489 TaskArgData::Scalar(_) => panic!("cannot share a scalar"),
490 TaskArgData::Buffer(_) => panic!("use the TaskArg::*_shared apis instead"),
491 TaskArgData::Local(_) => panic!("cannot not share a local buffer"),
492 TaskArgData::Shared(name, size) => {
493 (low_level::get_shared_buffer(name, *size, data.flags), *size)
494 }
495 TaskArgData::Empty => panic!("cannot have an empty arg"),
496 };
497
498 // println!("SharedMemBuffer size {len}");
499
500 SharedMemBuffer {
501 addr: addr,
502 size: len,
503 flags: data.flags,
504 meta: BufferMetaData {
505 orig_type_size: orig_type_size,
506 offset: 0,
507 len: len,
508 cnt: Arc::new(AtomicUsize::new(1)),
509 my_alloc: Arc::new(Mutex::new(Alloc::new(len))),
510 parent_alloc: None,
511 alloced: Arc::new(AtomicBool::new(false)),
512 },
513 }
514 }
515
516 pub(crate) fn base_addr(&self) -> *mut c_void {
517 self.addr
518 }
519
520 pub(crate) fn u8_offset(&self) -> i64 {
521 self.meta.offset as i64
522 }
523
524 pub(crate) fn u8_len(&self) -> u64 {
525 self.meta.len as u64
526 }
527
528 pub(crate) fn flags(&self) -> ArgOpt {
529 self.flags
530 }
531
532 ///Return the offset into the original SharedMemBuffer this handle starts at.
533 ///
534 /// # Examples
535 ///```no_run
536 /// let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
537 /// mcl.load_prog("my_path", mcl_rs::PrgType::Src);
538 /// let num_elems = 100;
539 /// let buf = mcl.create_shared_buffer(mcl_rs::TaskArg::inout_shared::<u32>("my_buffer", num_elems));
540 /// assert_eq!(buf.offset(),0);
541 /// let sub_buf = buf.sub_buffer(10..20);
542 /// assert_eq!(sub_buf.offset(),10);
543 ///```
544 pub fn offset(&self) -> usize {
545 self.meta.offset / self.meta.orig_type_size
546 }
547
548 ///Return the len of this (sub)-SharedMemBuffer
549 ///
550 /// # Examples
551 ///```no_run
552 /// let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
553 /// mcl.load_prog("my_path", mcl_rs::PrgType::Src);
554 /// let num_elems = 100;
555 /// let buf = mcl.create_shared_buffer(mcl_rs::TaskArg::inout_shared::<u32>("my_buffer", num_elems));
556 /// assert_eq!(buf.len(),100);
557 /// let sub_buf = buf.sub_buffer(10..20);
558 /// assert_eq!(sub_buf.len(),10);
559 ///```
560 pub fn len(&self) -> usize {
561 self.meta.len / self.meta.orig_type_size
562 }
563
564 /// Try to manually detach from this shared memory segment (i.e. decrement the global buffer reference count), this will only succeed if this is the last reference locally
565 ///
566 /// NOTE: Dropping a handle potentially calls this automatically provided it is the last local (to this process) reference to the buffer.
567 ///
568 /// # Examples
569 ///```no_run
570 /// let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
571 /// mcl.load_prog("my_path", mcl_rs::PrgType::Src);
572 ///
573 /// let num_elems = 100;
574 /// let buf = mcl.create_shared_buffer(mcl_rs::TaskArg::inout_shared::<u32>("my_buffer", num_elems));
575 /// let sub_buf = buf.sub_buffer(10..20);
576 ///
577 /// let buf = buf.try_detach();
578 /// assert!(buf.is_err());
579 /// let buf = buf.unwrap_err();
580 /// drop(sub_buf);
581 /// assert!(buf.try_detach().is_ok())
582 ///```
583 pub fn try_detach(self) -> Result<(), Self> {
584 if self.meta.cnt.fetch_sub(1, Ordering::SeqCst) == 1 {
585 low_level::detach_shared_buffer(self.addr);
586 Ok(())
587 } else {
588 Err(self)
589 }
590 }
591
592 /// Creates a sub buffer using the provided range from a given SharedMemBuffer
593 /// The sub buffer essentially "locks" the elements in the provided range
594 /// *BUT ONLY ON THE CALLING PROCESS* (other processes will have no idea of these locked regions)
595 /// delaying other (local to this process) sub buffers from executing with overlapping elements until all references to this sub buffer
596 /// have been dropped.
597 /// Note that sub buffer element locking happens at task execution time rather that sub buffer handle creation.
598 /// This allows overlapping sub buffers be created and passed as arguments to different tasks, with the dependecies
599 /// being handled automatically based on the submission and execution order of the tasks
600 ///
601 /// One can also create sub buffers of sub buffers.
602 ///
603 /// # Examples
604 ///```no_run
605 /// let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
606 /// mcl.load_prog("my_path", mcl_rs::PrgType::Src);
607 ///
608 /// let num_elems = 100;
609 /// let buf = mcl.create_shared_buffer(mcl_rs::TaskArg::inout_shared::<u32>("my_buffer", num_elems));
610 /// let sub_buf1 = buf.sub_buffer(10..20);
611 /// let sub_buf2 = buf.sub_buffer(15..25); // this call will succeed even though it overlaps with sub_buf1
612 /// let tasks = async move {
613 /// let pes: [u64; 3] = [1, 1, 1]
614 /// let task_1 = mcl.task("my_kernel", 1)
615 /// .arg_buffer(mcl_rs::TaskArg::output_slice(sub_buf1))
616 /// .dev(mcl_rs::DevType::CPU)
617 /// .exec(pes);
618 /// let task_1 = mcl.task("my_kernel", 1)
619 /// .arg_buffer(mcl_rs::TaskArg::output_slice(sub_buf1))
620 /// .dev(mcl_rs::DevType::CPU)
621 /// .exec(pes);
622 /// /// We can even create our next task sucessfully with the overlapping buffer because no actual work occurs until we call await
623 /// let task_2 = mcl.task("my_kernel", 1)
624 /// .arg_buffer(mcl_rs::TaskArg::output_slice(sub_buf1))
625 /// .dev(mcl_rs::DevType::CPU)
626 /// .exec(pes);
627 /// }
628 /// // drive both futures simultaneously -- based on the overlapping dependency, these task will in reality be executed serially
629 /// // as the internal implementation will prevent both tasks from allocating the overlapping sub_buffer regions simultaneously
630 /// futures::future::join_all([task_1,task_2]);
631 /// futures::executor::block_on(task);
632 ///
633 ///```
634 pub fn sub_buffer(&self, range: impl RangeBounds<usize>) -> Self {
635 let u8_start = match range.start_bound() {
636 Bound::Included(idx) => idx * self.meta.orig_type_size,
637 Bound::Excluded(idx) => (idx - 1) * self.meta.orig_type_size,
638 Bound::Unbounded => 0,
639 };
640
641 let u8_end = match range.end_bound() {
642 Bound::Included(idx) => (idx + 1) * self.meta.orig_type_size,
643 Bound::Excluded(idx) => idx * self.meta.orig_type_size,
644 Bound::Unbounded => self.meta.len,
645 };
646
647 let len = u8_end - u8_start;
648 let offset = self.meta.offset + u8_start;
649 // println!("{len} {offset} {u8_start} {u8_end}");
650 self.meta.cnt.fetch_add(1, Ordering::SeqCst);
651 SharedMemBuffer {
652 addr: self.addr.clone(),
653 size: self.size,
654 flags: self.flags.clone(),
655 meta: BufferMetaData {
656 orig_type_size: self.meta.orig_type_size,
657 offset: offset,
658 len: len,
659 cnt: self.meta.cnt.clone(),
660 my_alloc: Arc::new(Mutex::new(Alloc::new(self.size))),
661 parent_alloc: Some(self.meta.my_alloc.clone()),
662 alloced: Arc::new(AtomicBool::new(false)),
663 },
664 }
665 }
666
667 /// Extract a T slice from this SharedMemBuffer handle.
668 ///
669 /// #Saftey
670 /// This is unsafe as we currently have no mechanism to guarantee the alignment of T with the alignment used to originally create the buffer
671 /// potentially in a different process. The user must ensure the alignment is valid otherwise behavior is undefined.
672 ///
673 /// # Examples
674 ///```no_run
675 /// let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
676 /// mcl.load_prog("my_path", mcl_rs::PrgType::Src);
677 ///
678 /// let num_elems = 100;
679 /// let buf = mcl.attach_shared_buffer(mcl_rs::TaskArg::inout_shared::<u32>("my_buffer", num_elems));
680 /// let sliced = unsafe { buf.as_slice::<u32>()};
681 ///```
682 pub unsafe fn as_slice<T>(&self) -> &[T] {
683 assert_eq!(self.meta.len % std::mem::size_of::<T>(),0, "Leftover bytes when tryin to create slice i.e. (buffer len in bytes) % (size of T) != 0");
684 std::slice::from_raw_parts(
685 self.addr as *const T,
686 self.meta.len / std::mem::size_of::<T>(),
687 )
688 }
689
690 /// Extract a T slice from this SharedMemBuffer handle.
691 ///
692 /// #Saftey
693 /// This is unsafe as we currently have no mechanism to guarantee the alignment of T with the alignment used to originally create the buffer
694 /// potentially in a different process. The user must ensure the alignment is valid otherwise behavior is undefined.
695 ///
696 /// # Examples
697 ///```no_run
698 /// let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
699 /// mcl.load_prog("my_path", mcl_rs::PrgType::Src);
700 ///
701 /// let num_elems = 100;
702 /// let buf = mcl.attach_shared_buffer(mcl_rs::TaskArg::inout_shared::<u32>("my_buffer", num_elems));
703 /// let slice = unsafe { buf.as_mut_slice::<u32>()};
704 ///```
705 pub unsafe fn as_mut_slice<T>(&self) -> &mut [T] {
706 assert_eq!(self.meta.len % std::mem::size_of::<T>(),0, "Leftover bytes when tryin to create slice i.e. (buffer len in bytes) % (size of T) != 0");
707 std::slice::from_raw_parts_mut(self.addr as *mut T, self.meta.len)
708 }
709
710 async fn inner_alloc(&self, alloc: &Mutex<Alloc>) {
711 while !self.meta.alloced.load(Ordering::SeqCst) {
712 let mut alloc_guard = alloc.lock().unwrap();
713 if let Ok(_) = alloc_guard.try_alloc(self.meta.offset, self.meta.offset + self.meta.len)
714 {
715 self.meta.alloced.store(true, Ordering::SeqCst);
716 }
717 drop(alloc_guard);
718 async_std::task::yield_now().await;
719 }
720 }
721
722 pub(crate) async fn alloc(&self) {
723 if let Some(alloc) = self.meta.parent_alloc.as_ref() {
724 //we are a sub array!()
725 self.inner_alloc(alloc).await;
726 } else {
727 //we are not a sub array!
728 self.inner_alloc(&self.meta.my_alloc).await;
729 }
730 }
731}