ulib/
uvec.rs

1//! Universal vector-like array storage [`UVec`].
2
3use super::*;
4use std::sync::Mutex;
5use bytemuck::Zeroable;
6use std::hash::{ Hash, Hasher };
7use std::ops::{ Deref, DerefMut, Index, IndexMut };
8use std::fmt;
9
10#[cfg(feature = "cuda")]
11use cust::memory::{ DeviceBuffer, DeviceSlice, CopyDestination };
12#[cfg(feature = "cuda")]
13use cust::context::Context;
14
15/// Universal vector-like array storage.
16///
17/// `UVec` is thread-safe. Specifically, its
18/// read-only reference can be shared across different threads.
19/// This is nontrivial because a read in `UVec` might schedule
20/// a copy across device.
21/// `UVec` is `Send` but not `Sync`.
22pub struct UVec<T: UniversalCopy> {
23    data_cpu: Option<Box<[T]>>,
24    #[cfg(feature = "cuda")]
25    data_cuda: [Option<DeviceBuffer<T>>; MAX_NUM_CUDA_DEVICES],
26    /// A flag array recording the data presence and dirty status.
27    /// A true entry means the data is valid on that device.
28    valid_flag: [bool; MAX_DEVICES],
29    /// Read locks for all devices
30    ///
31    /// This will not be locked for any operation originating
32    /// from a write access -- no need to do so because Rust
33    /// guarantees exclusive mutable reference.
34    ///
35    /// This will not be locked for readonly reference as long as
36    /// our interested device is already ready for read (valid)
37    /// -- no need to do so because Rust guarantees no mutation
38    /// operation ever possible when a read-only reference
39    /// is alive.
40    ///
41    /// This will ONLY be locked when a copy across device
42    /// need to be launched with a read-only reference.
43    /// The lock, in this case, is also per receiver device.
44    read_locks: [Mutex<()>; MAX_DEVICES],
45    /// the length of content
46    size: usize,
47    /// the length of buffer
48    capacity: usize,
49}
50
51impl<T: UniversalCopy + fmt::Debug> fmt::Debug for UVec<T> {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        let slice = self.as_ref();
54        write!(f, "uvec[{}] = [", slice.len())?;
55        for (i, e) in slice.iter().enumerate() {
56            if i != 0 {
57                write!(f, ", ")?;
58            }
59            if f.alternate() {
60                write!(f, "{:#?}", e)?;
61            }
62            else {
63                write!(f, "{:?}", e)?;
64            }
65        }
66        write!(f, "]")
67    }
68}
69
70impl<T: UniversalCopy> Default for UVec<T> {
71    #[inline]
72    fn default() -> Self {
73        Self {
74            data_cpu: None,
75            #[cfg(feature = "cuda")]
76            data_cuda: Default::default(),
77            valid_flag: [false; MAX_DEVICES],
78            read_locks: Default::default(),
79            size: 0,
80            capacity: 0
81        }
82    }
83}
84
85impl<T: UniversalCopy> From<Box<[T]>> for UVec<T> {
86    #[inline]
87    fn from(b: Box<[T]>) -> UVec<T> {
88        let len = b.len();
89        let mut valid_flag = [false; MAX_DEVICES];
90        valid_flag[Device::CPU.to_id()] = true;
91        Self {
92            data_cpu: Some(b),
93            #[cfg(feature = "cuda")]
94            data_cuda: Default::default(),
95            valid_flag,
96            read_locks: Default::default(),
97            size: len,
98            capacity: len
99        }
100    }
101}
102
103impl<T: UniversalCopy> From<Vec<T>> for UVec<T> {
104    #[inline]
105    fn from(v: Vec<T>) -> UVec<T> {
106        v.into_boxed_slice().into()
107    }
108}
109
110impl<T: UniversalCopy> FromIterator<T> for UVec<T> {
111    #[inline]
112    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
113        Vec::from_iter(iter).into()
114    }
115}
116
117impl<T: UniversalCopy + Zeroable> UVec<T> {
118    /// private function to allocate space for one device.
119    ///
120    /// Guaranteed to only modify the buffer and the validity
121    /// bit of the specified device.
122    /// (which is useful in the safety of read-schedule
123    /// interior mutability.)
124    #[inline]
125    fn alloc_zeroed(&mut self, device: Device) {
126        use Device::*;
127        match device {
128            CPU => {
129                use std::alloc;
130                self.data_cpu = Some(unsafe {
131                    let ptr = alloc::alloc_zeroed(
132                        alloc::Layout::array::<T>(
133                            self.capacity
134                        ).unwrap()) as *mut T;
135                    Box::from_raw(
136                        core::ptr::slice_from_raw_parts_mut(
137                            ptr, self.size))
138                    // Box::new_zeroed_slice(sz).assume_init()
139                });
140            },
141            #[cfg(feature = "cuda")]
142            CUDA(c) => {
143                let _context = Context::new(
144                    CUDA_DEVICES[c as usize].0).unwrap();
145                self.data_cuda[c as usize] =
146                    Some(DeviceBuffer::zeroed(self.capacity)
147                         .unwrap());
148            }
149        }
150    }
151}
152
153#[inline]
154unsafe fn alloc_cpu_uninit<T: UniversalCopy>(
155    sz: usize
156) -> Box<[T]> {
157    use std::alloc;
158    let ptr = alloc::alloc(alloc::Layout::array::<T>(sz).unwrap())
159        as *mut T;
160    Box::from_raw(core::ptr::slice_from_raw_parts_mut(ptr, sz))
161}
162
163#[cfg(feature = "cuda")]
164#[inline]
165unsafe fn alloc_cuda_uninit<T: UniversalCopy>(
166    sz: usize, dev: u8
167) -> DeviceBuffer<T> {
168    let _context = Context::new(CUDA_DEVICES[dev as usize].0)
169        .unwrap();
170    DeviceBuffer::uninitialized(sz).unwrap()
171}
172
173impl<T: UniversalCopy> UVec<T> {
174    /// private function to allocate space for one device.
175    ///
176    /// Guaranteed to only modify the buffer and the validity
177    /// bit of the specified device.
178    /// (which is useful in the safety of read-schedule
179    /// interior mutability.)
180    #[inline]
181    unsafe fn alloc_uninitialized(&mut self, device: Device) {
182        use Device::*;
183        match device {
184            CPU => {
185                self.data_cpu = Some(alloc_cpu_uninit(
186                    self.capacity));
187            },
188            #[cfg(feature = "cuda")]
189            CUDA(c) => {
190                self.data_cuda[c as usize] = Some(
191                    alloc_cuda_uninit(self.capacity, c));
192            }
193        }
194    }
195    
196    /// private function to get one device with valid data
197    #[inline]
198    fn device_valid(&self) -> Option<Device> {
199        self.valid_flag.iter().enumerate().find(|(_i, v)| **v)
200            .map(|(i, _v)| Device::from_id(i))
201    }
202
203    /// schedule a device to make its data available.
204    ///
205    /// Guaranteed to only modify the buffer and the validity
206    /// bit of the specified device.
207    /// (which is useful in the safety of read-schedule
208    /// interior mutability.)
209    #[inline]
210    fn schedule_device_read(&mut self, device: Device) {
211        if self.valid_flag[device.to_id()] {
212            return
213        }
214        use Device::*;
215        let is_none = match device {
216            CPU => self.data_cpu.is_none(),
217            #[cfg(feature = "cuda")]
218            CUDA(c) => self.data_cuda[c as usize].is_none()
219        };
220        if is_none {
221            unsafe { self.alloc_uninitialized(device); }
222        }
223        let device_valid = self.device_valid().expect("no valid dev");
224        match (device_valid, device) {
225            (CPU, CPU) => {},
226            #[cfg(feature = "cuda")]
227            (CPU, CUDA(c)) => {
228                let c = c as usize;
229                self.data_cuda[c].as_mut().unwrap().index(..self.size)
230                    .copy_from(
231                        &self.data_cpu.as_ref().unwrap()[..self.size]
232                    ).unwrap();
233            },
234            #[cfg(feature = "cuda")]
235            (CUDA(c), CPU) => {
236                let c = c as usize;
237                self.data_cuda[c].as_ref().unwrap().index(..self.size)
238                    .copy_to(
239                        &mut self.data_cpu.as_mut().unwrap()[..self.size]
240                    ).unwrap();
241            },
242            #[cfg(feature = "cuda")]
243            (CUDA(c1), CUDA(c2)) => {
244                let (c1, c2) = (c1 as usize, c2 as usize);
245                assert_ne!(c1, c2);
246                // unsafe is used to access one mutable element.
247                // safety guaranteed by the above `assert_ne!`.
248                let c2_mut = unsafe {
249                    &mut *(self.data_cuda[c2].as_ref().unwrap()
250                           as *const DeviceBuffer<T>
251                           as *mut DeviceBuffer<T>)
252                };
253                self.data_cuda[c1].as_ref().unwrap().index(..self.size)
254                    .copy_to(
255                        &mut c2_mut.index(..self.size)
256                    ).unwrap();
257            }
258        }
259        self.valid_flag[device.to_id()] = true;
260    }
261
262    /// schedule a device to make its data available
263    /// THROUGH a read-only reference.
264    ///
265    /// will acquire a lock if it is necessary.
266    /// If you have mutable reference, use the lock-free
267    /// `schedule_device_read` instead.
268    #[inline]
269    fn schedule_device_read_ro(&self, device: Device) {
270        if self.valid_flag[device.to_id()] {
271            return
272        }
273        let locked = self.read_locks[device.to_id()]
274            .lock().unwrap();
275        // safety guaranteed by the lock, and by the
276        // guarantee of `schedule_device_read` that only
277        // writes to fields related to the specified device.
278        unsafe {
279            (&mut *(self as *const UVec<T> as *mut UVec<T>))
280                .schedule_device_read(device);
281        }
282        drop(locked);
283    }
284
285    /// schedule a device write. invalidates all other ranges.
286    #[inline]
287    fn schedule_device_write(&mut self, device: Device) {
288        if !self.valid_flag[device.to_id()] {
289            self.schedule_device_read(device);
290        }
291        // only this is valid.
292        self.valid_flag[..].fill(false);
293        self.valid_flag[device.to_id()] = true;
294    }
295
296    #[inline]
297    fn drop_all_buf(&mut self) {
298        self.data_cpu = None;
299        #[cfg(feature = "cuda")]
300        for d in &mut self.data_cuda {
301            *d = None;
302        }
303    }
304
305    #[inline]
306    unsafe fn realloc_uninit_nopreserve(&mut self, device: Device) {
307        self.drop_all_buf();
308        if self.capacity > 10000000 {
309            clilog::debug!("large realloc: capacity {}",
310                           self.capacity);
311        }
312        self.alloc_uninitialized(device);
313        self.valid_flag.fill(false);
314        self.valid_flag[device.to_id()] = true;
315    }
316    
317    #[inline]
318    unsafe fn realloc_uninit_preserve(&mut self, device: Device) {
319        use Device::*;
320        match device {
321            CPU => {
322                let old = self.data_cpu.take().unwrap();
323                self.drop_all_buf();
324                self.alloc_uninitialized(device);
325                self.data_cpu.as_mut().unwrap()[..self.size]
326                    .copy_from_slice(&old[..self.size]);
327            },
328            #[cfg(feature = "cuda")]
329            CUDA(c) => {
330                let c = c as usize;
331                let old = self.data_cuda[c].take().unwrap();
332                self.drop_all_buf();
333                self.alloc_uninitialized(device);
334                self.data_cuda[c].as_mut().unwrap().index(..self.size)
335                    .copy_from(&old.index(..self.size))
336                    .unwrap();
337            }
338        }
339        self.valid_flag.fill(false);
340        self.valid_flag[device.to_id()] = true;
341    }
342
343    #[inline]
344    pub fn get(&self, idx: usize) -> T {
345        use Device::*;
346        match self.device_valid().unwrap() {
347            CPU => self.data_cpu.as_ref().unwrap()[idx],
348            #[cfg(feature = "cuda")]
349            CUDA(c) => {
350                let mut ret: [T; 1] = unsafe {
351                    std::mem::MaybeUninit::uninit().assume_init()
352                };
353                self.data_cuda[c as usize].as_ref().unwrap()
354                    .index(idx)
355                    .copy_to(&mut ret)
356                    .unwrap();
357                ret[0]
358            }
359        }
360    }
361}
362
363impl<T: UniversalCopy + Zeroable> UVec<T> {
364    /// Create a new zeroed universal vector with specific size.
365    #[inline]
366    pub fn new_zeroed(size: usize, device: Device) -> UVec<T> {
367        let mut v: UVec<T> = Default::default();
368        v.size = size;
369        v.capacity = size;
370        v.alloc_zeroed(device);
371        v.valid_flag[device.to_id()] = true;
372        v
373    }
374}
375
376impl<T: UniversalCopy> UVec<T> {
377    /// Get length (size) of this vector.
378    #[inline]
379    pub fn len(&self) -> usize {
380        self.size
381    }
382    
383    /// Get capacity of this vector.
384    #[inline]
385    pub fn capacity(&self) -> usize {
386        self.capacity
387    }
388
389    /// New empty vector (can be used as placeholder).
390    #[inline]
391    pub fn new() -> UVec<T> {
392        unsafe { Self::new_uninitialized(0, Device::CPU) }
393    }
394    
395    /// Create a new uninitialized universal vector with
396    /// specific size.
397    #[inline]
398    pub unsafe fn new_uninitialized(
399        size: usize, device: Device
400    ) -> UVec<T> {
401        let mut v: UVec<T> = Default::default();
402        v.size = size;
403        v.capacity = size;
404        v.alloc_uninitialized(device);
405        v.valid_flag[device.to_id()] = true;
406        v
407    }
408
409    /// Resize the universal vector, but do **not** preserve the
410    /// original content.
411    /// The potential new elements are **uninitialized**.
412    ///
413    /// If the current capacity is sufficient, we do not need to
414    /// reallocate or do anything else. We just mark the desired
415    /// device as valid.
416    ///
417    /// If the current capacity is insufficient, a reallocation
418    /// is needed and all current allocations are dropped.
419    /// (we maintain the invariant that all allocated buffers for
420    /// all devices must all have the same length (= capacity).)
421    #[inline]
422    pub unsafe fn resize_uninit_nopreserve(&mut self, size: usize, device: Device) {
423        if self.capacity < size {
424            self.capacity = (size as f64 * 1.5).round() as usize;
425            self.realloc_uninit_nopreserve(device);
426        }
427        self.size = size;
428    }
429
430    /// Resize the universal vector, and preserve all the
431    /// original content.
432    /// The potential new elements are **uninitialized**.
433    #[inline]
434    pub unsafe fn resize_uninit_preserve(&mut self, size: usize, device: Device) {
435        if self.size != 0 {
436            self.schedule_device_read(device);
437        }
438        if self.capacity < size {
439            self.capacity = (size as f64 * 1.5).round() as usize;
440            self.realloc_uninit_preserve(device);
441        }
442        self.size = size;
443        self.valid_flag.fill(false);
444        self.valid_flag[device.to_id()] = true;
445    }
446}
447
448impl<T: UniversalCopy> AsRef<[T]> for UVec<T> {
449    /// Get a CPU slice reference.
450    /// 
451    /// This COULD fail, actually, when we need to copy from
452    /// a GPU value to CPU.
453    /// This violates the guideline but we have no choice.
454    ///
455    /// It will lock only when a copy is needed.
456    #[inline]
457    fn as_ref(&self) -> &[T] {
458        self.schedule_device_read_ro(Device::CPU);
459        &self.data_cpu.as_ref().unwrap()[..self.size]
460    }
461}
462
463impl<T: UniversalCopy> AsMut<[T]> for UVec<T> {
464    /// Get a mutable CPU slice reference.
465    /// 
466    /// This COULD fail, actually, when we need to copy from
467    /// a GPU value to CPU.
468    /// This violates the guideline but we have no choice.
469    ///
470    /// It is lock-free.
471    #[inline]
472    fn as_mut(&mut self) -> &mut [T] {
473        self.schedule_device_write(Device::CPU);
474        &mut self.data_cpu.as_mut().unwrap()[..self.size]
475    }
476}
477
478impl<T: UniversalCopy> Deref for UVec<T> {
479    type Target = [T];
480    /// `Deref` is now implemented for `UVec` to let you
481    /// use it transparently.
482    ///
483    /// Internally it may fail because it might schedule a
484    /// inter-device copy to make the data available on CPU.
485    /// But it is thread-safe.
486    #[inline]
487    fn deref(&self) -> &[T] {
488        self.as_ref()
489    }
490}
491
492impl<T: UniversalCopy> DerefMut for UVec<T> {
493    /// `Deref` is now implemented for `UVec` to let you
494    /// use it transparently.
495    ///
496    /// Internally it may fail because it might schedule a
497    /// inter-device copy to make the data available on CPU.
498    /// But it is thread-safe.
499    #[inline]
500    fn deref_mut(&mut self) -> &mut [T] {
501        self.as_mut()
502    }
503}
504
505impl<T: UniversalCopy, I> Index<I> for UVec<T> where [T]: Index<I> {
506    type Output = <[T] as Index<I>>::Output;
507    #[inline]
508    fn index(&self, i: I) -> &Self::Output {
509        self.as_ref().index(i)
510    }
511}
512
513impl<T: UniversalCopy, I> IndexMut<I> for UVec<T> where [T]: IndexMut<I> {
514    #[inline]
515    fn index_mut(&mut self, i: I) -> &mut Self::Output {
516        self.as_mut().index_mut(i)
517    }
518}
519
520#[cfg(feature = "cuda")]
521impl<T: UniversalCopy> AsCUDASlice<T> for UVec<T> {
522    #[inline]
523    fn as_cuda_slice(&self, cuda_device: Device) -> DeviceSlice<T> {
524        use Device::*;
525        let c = match cuda_device {
526            CUDA(c) => c as usize,
527            _ => panic!("AsCUDASlice does not accept \
528                         non-CUDA device {:?}", cuda_device)
529        };
530        self.schedule_device_read_ro(cuda_device);
531        // construct a slice with only the first size elements.
532        let ptr = self.data_cuda[c].as_ref().unwrap().as_device_ptr();
533        unsafe { DeviceSlice::from_raw_parts(ptr, self.size) }
534    }
535}
536
537#[cfg(feature = "cuda")]
538impl<T: UniversalCopy> AsCUDASliceMut<T> for UVec<T> {
539    #[inline]
540    fn as_cuda_slice_mut(&mut self, cuda_device: Device) -> DeviceSlice<T> {
541        use Device::*;
542        let c = match cuda_device {
543            CUDA(c) => c as usize,
544            _ => panic!("AsCUDASlice does not accept \
545                         non-CUDA device {:?}", cuda_device)
546        };
547        self.schedule_device_write(cuda_device);
548        // construct a slice with only the first size elements.
549        let ptr = self.data_cuda[c].as_ref().unwrap().as_device_ptr();
550        unsafe { DeviceSlice::from_raw_parts(ptr, self.size) }
551    }
552}
553
554impl<T: UniversalCopy> AsUPtr<T> for UVec<T> {
555    #[inline]
556    fn as_uptr(&self, device: Device) -> *const T {
557        self.schedule_device_read_ro(device);
558        use Device::*;
559        match device {
560            CPU => self.data_cpu.as_ref().unwrap().as_ptr(),
561            #[cfg(feature = "cuda")]
562            CUDA(c) => self.data_cuda[c as usize].as_ref().unwrap()
563                .as_device_ptr().as_ptr()
564        }
565    }
566}
567
568impl<T: UniversalCopy> AsUPtrMut<T> for UVec<T> {
569    #[inline]
570    fn as_mut_uptr(&mut self, device: Device) -> *mut T {
571        self.schedule_device_write(device);
572        use Device::*;
573        match device {
574            CPU => self.data_cpu.as_mut().unwrap().as_mut_ptr(),
575            #[cfg(feature = "cuda")]
576            CUDA(c) => self.data_cuda[c as usize].as_mut().unwrap()
577                .as_device_ptr().as_mut_ptr()
578        }
579    }
580}
581
582// although convenient, below gets in the way of automatic type inference.
583
584// impl<T: UniversalCopy, const N: usize> AsUPtr<T> for UVec<[T; N]> {
585//     /// convenient way to get flattened pointer
586//     #[inline]
587//     fn as_uptr(&self, device: Device) -> *const T {
588//         AsUPtr::<[T; N]>::as_uptr(self, device) as *const T
589//     }
590// }
591
592// impl<T: UniversalCopy, const N: usize> AsUPtrMut<T> for UVec<[T; N]> {
593//     /// convenient way to get flattened pointer
594//     #[inline]
595//     fn as_mut_uptr(&mut self, device: Device) -> *mut T {
596//         AsUPtrMut::<[T; N]>::as_mut_uptr(self, device) as *mut T
597//     }
598// }
599
600impl<T, U: UniversalCopy> AsUPtr<U> for &T where T: AsUPtr<U> {
601    #[inline]
602    fn as_uptr(&self, device: Device) -> *const U {
603        (*self).as_uptr(device)
604    }
605}
606
607impl<T, U: UniversalCopy> AsUPtrMut<U> for &mut T where T: AsUPtrMut<U> {
608    #[inline]
609    fn as_mut_uptr(&mut self, device: Device) -> *mut U {
610        (*self).as_mut_uptr(device)
611    }
612}
613
614impl<T: UniversalCopy + Hash> Hash for UVec<T> {
615    #[inline]
616    fn hash<H: Hasher>(&self, state: &mut H) {
617        self.as_ref().hash(state)
618    }
619}
620
621impl<T: UniversalCopy, U: UniversalCopy> PartialEq<UVec<U>> for UVec<T>
622    where T: PartialEq<U>
623{
624    #[inline]
625    fn eq(&self, other: &UVec<U>) -> bool {
626        self.as_ref() == other.as_ref()
627    }
628}
629
630impl<T: UniversalCopy + Eq> Eq for UVec<T> { }
631
632impl<T: UniversalCopy> Clone for UVec<T> {
633    fn clone(&self) -> Self {
634        let valid_flag = self.valid_flag.clone();
635        let data_cpu = match valid_flag[Device::CPU.to_id()] {
636            true => self.data_cpu.clone(),
637            false => None
638        };
639        #[cfg(feature = "cuda")]
640        let data_cuda = unsafe {
641            let mut data_cuda: [Option<DeviceBuffer<T>>; MAX_NUM_CUDA_DEVICES] = Default::default();
642            for i in 0..MAX_NUM_CUDA_DEVICES {
643                if valid_flag[Device::CUDA(i as u8).to_id()] {
644                    let dbuf = alloc_cuda_uninit(self.capacity, i as u8);
645                    self.data_cuda[i].as_ref().unwrap().index(..self.size)
646                        .copy_to(&mut dbuf.index(..self.size))
647                        .unwrap();
648                    data_cuda[i] = Some(dbuf);
649                }
650            }
651            data_cuda
652        };
653        UVec {
654            data_cpu,
655            #[cfg(feature = "cuda")] data_cuda,
656            valid_flag,
657            read_locks: Default::default(),
658            size: self.size,
659            capacity: self.capacity
660        }
661    }
662}