Skip to main content

vortex_array/
buffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::hash::Hasher;
8use std::ops::Range;
9use std::sync::Arc;
10
11use futures::future::BoxFuture;
12use vortex_buffer::ALIGNMENT_TO_HOST_COPY;
13use vortex_buffer::Alignment;
14use vortex_buffer::ByteBuffer;
15use vortex_error::VortexExpect;
16use vortex_error::VortexResult;
17use vortex_utils::dyn_traits::DynEq;
18use vortex_utils::dyn_traits::DynHash;
19
20use crate::ArrayEq;
21use crate::ArrayHash;
22use crate::Precision;
23
24/// A handle to a buffer allocation.
25///
26/// There are two kinds of buffer allocations supported:
27///
28///   * **host** allocations, which were allocated by the global allocator and reside in main memory
29///   * **device** allocations, which are remote to the CPU and live on a GPU or other external
30///     device.
31///
32/// A device allocation can be copied to the host, yielding a new [`ByteBuffer`] containing the
33/// copied data. Copying can fail at runtime, error recovery is system-dependent.
34#[derive(Debug, Clone)]
35pub struct BufferHandle(Inner);
36
37#[derive(Debug, Clone)]
38enum Inner {
39    /// On the host/cpu.
40    Host(ByteBuffer),
41    /// On the device/gpu.
42    Device(Arc<dyn DeviceBuffer>),
43}
44
45/// A buffer that is stored on the GPU.
46pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash {
47    /// Returns a reference as `Any` to enable downcasting.
48    fn as_any(&self) -> &dyn Any;
49
50    /// Returns the length of the buffer in bytes.
51    fn len(&self) -> usize;
52
53    /// Returns the alignment of the buffer.
54    fn alignment(&self) -> Alignment;
55
56    /// Returns true if the buffer is empty.
57    fn is_empty(&self) -> bool {
58        self.len() == 0
59    }
60
61    /// Attempts to copy the device buffer to a host ByteBuffer.
62    ///
63    /// # Errors
64    ///
65    /// This operation may fail, depending on the device implementation and the underlying hardware.
66    fn copy_to_host_sync(&self, alignment: Alignment) -> VortexResult<ByteBuffer>;
67
68    /// Copies the device buffer to a host buffer asynchronously.
69    ///
70    /// Schedules an async copy and returns a future that completes when the copy is finished.
71    ///
72    /// # Arguments
73    ///
74    /// * `alignment` - The memory alignment to use for the host buffer.
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if the async copy operation fails.
79    fn copy_to_host(
80        &self,
81        alignment: Alignment,
82    ) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>>;
83
84    /// Create a new buffer that references a subrange of this buffer at the given
85    /// slice indices.
86    ///
87    /// Note that slice indices are in byte units.
88    fn slice(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;
89
90    /// Return a buffer with the given alignment. Where possible, this will be zero-copy.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the buffer cannot be aligned (e.g., allocation or copy failure).
95    fn aligned(self: Arc<Self>, alignment: Alignment) -> VortexResult<Arc<dyn DeviceBuffer>>;
96}
97
98pub trait DeviceBufferExt: DeviceBuffer {
99    /// Slice a range of elements `T` out of the device buffer.
100    fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;
101}
102
103impl<B: DeviceBuffer> DeviceBufferExt for B {
104    fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer> {
105        let start_bytes = range.start * size_of::<T>();
106        let end_bytes = range.end * size_of::<T>();
107        self.slice(start_bytes..end_bytes)
108    }
109}
110
111impl Hash for dyn DeviceBuffer {
112    fn hash<H: Hasher>(&self, state: &mut H) {
113        self.dyn_hash(state);
114    }
115}
116
117impl PartialEq for dyn DeviceBuffer {
118    fn eq(&self, other: &Self) -> bool {
119        self.dyn_eq(other)
120    }
121}
122impl Eq for dyn DeviceBuffer {}
123
124impl BufferHandle {
125    /// Create a new handle to a host [`ByteBuffer`].
126    pub fn new_host(byte_buffer: ByteBuffer) -> Self {
127        BufferHandle(Inner::Host(byte_buffer))
128    }
129
130    /// Create a new handle to a memory allocation that exists on an external device.
131    ///
132    /// Allocations on external devices are not cheaply accessible from the CPU and most be copied
133    /// into new memory when we read them.
134    pub fn new_device(device: Arc<dyn DeviceBuffer>) -> Self {
135        BufferHandle(Inner::Device(device))
136    }
137}
138
139impl BufferHandle {
140    /// Returns `true` if this buffer resides on the device (GPU).
141    pub fn is_on_device(&self) -> bool {
142        matches!(&self.0, Inner::Device(_))
143    }
144
145    /// Returns `true` if this buffer resides on the host (CPU).
146    pub fn is_on_host(&self) -> bool {
147        matches!(&self.0, Inner::Host(_))
148    }
149
150    /// Gets the size of the buffer, in bytes.
151    pub fn len(&self) -> usize {
152        match &self.0 {
153            Inner::Host(bytes) => bytes.len(),
154            Inner::Device(device) => device.len(),
155        }
156    }
157
158    /// Returns the alignment of the buffer.
159    pub fn alignment(&self) -> Alignment {
160        match &self.0 {
161            Inner::Host(bytes) => bytes.alignment(),
162            Inner::Device(device) => device.alignment(),
163        }
164    }
165
166    /// Returns true if the buffer is aligned to the given alignment.
167    pub fn is_aligned_to(&self, alignment: Alignment) -> bool {
168        self.alignment().is_aligned_to(alignment)
169    }
170
171    /// Ensure the buffer satisfies the requested alignment.
172    ///
173    /// Both host and device buffers will be copied if necessary to satisfy the alignment.
174    pub fn ensure_aligned(self, alignment: Alignment) -> VortexResult<Self> {
175        match self.0 {
176            Inner::Host(buffer) => Ok(BufferHandle::new_host(buffer.aligned(alignment))),
177            Inner::Device(device) => Ok(BufferHandle::new_device(device.aligned(alignment)?)),
178        }
179    }
180
181    /// Check if the buffer is empty.
182    pub fn is_empty(&self) -> bool {
183        self.len() == 0
184    }
185
186    /// Creates a new handle to a subrange of memory at the given bind range.
187    ///
188    ///
189    /// # Example
190    ///
191    /// ```
192    /// # use vortex_array::buffer::BufferHandle;
193    /// # use vortex_buffer::buffer;
194    /// let handle1 = BufferHandle::new_host(buffer![1u8,2,3,4]);
195    /// let handle2 = handle1.slice(1..4);
196    /// assert_eq!(handle2.unwrap_host(), buffer![2u8,3,4]);
197    /// ```
198    pub fn slice(&self, range: Range<usize>) -> Self {
199        match &self.0 {
200            Inner::Host(host) => BufferHandle::new_host(host.slice(range)),
201            Inner::Device(device) => BufferHandle::new_device(device.slice(range)),
202        }
203    }
204
205    /// Reinterpret the pointee as a buffer of `T` and slice the provided element range.
206    ///
207    /// # Example
208    ///
209    /// ```
210    /// # use vortex_array::buffer::BufferHandle;
211    /// # use vortex_buffer::{buffer, Buffer};
212    /// let values = buffer![1u32, 2u32, 3u32, 4u32];
213    /// let handle = BufferHandle::new_host(values.into_byte_buffer());
214    /// let sliced = handle.slice_typed::<u32>(1..4);
215    /// let result = Buffer::<u32>::from_byte_buffer(sliced.to_host_sync());
216    /// assert_eq!(result, buffer![2, 3, 4]);
217    /// ```
218    pub fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Self {
219        let start = range.start * size_of::<T>();
220        let end = range.end * size_of::<T>();
221
222        self.slice(start..end)
223    }
224
225    #[allow(clippy::panic)]
226    /// Unwraps the handle as host memory.
227    ///
228    /// # Panics
229    ///
230    /// This will panic if the handle points to device memory.
231    pub fn unwrap_host(self) -> ByteBuffer {
232        match self.0 {
233            Inner::Host(b) => b,
234            Inner::Device(_) => panic!("unwrap_host called for Device allocation"),
235        }
236    }
237
238    #[allow(clippy::panic)]
239    /// Unwraps the handle as device memory.
240    ///
241    /// # Panics
242    ///
243    /// This will panic if the handle points to host memory.
244    pub fn unwrap_device(self) -> Arc<dyn DeviceBuffer> {
245        match self.0 {
246            Inner::Device(b) => b,
247            Inner::Host(_) => panic!("unwrap_device called for Host allocation"),
248        }
249    }
250
251    /// Downcast this handle as a handle to a host-resident buffer, or `None`.
252    pub fn as_host_opt(&self) -> Option<&ByteBuffer> {
253        match &self.0 {
254            Inner::Host(buffer) => Some(buffer),
255            Inner::Device(_) => None,
256        }
257    }
258
259    /// Downcast this handle as a handle to a device buffer, or `None`.
260    pub fn as_device_opt(&self) -> Option<&Arc<dyn DeviceBuffer>> {
261        match &self.0 {
262            Inner::Host(_) => None,
263            Inner::Device(device) => Some(device),
264        }
265    }
266
267    /// A version of [`as_host_opt`][Self::as_host_opt] that panics if the allocation is
268    /// not a host allocation.
269    pub fn as_host(&self) -> &ByteBuffer {
270        self.as_host_opt().vortex_expect("expected host buffer")
271    }
272
273    /// A version of [`as_device_opt`][Self::as_device_opt] that panics if the allocation is
274    /// not a device allocation.
275    pub fn as_device(&self) -> &Arc<dyn DeviceBuffer> {
276        self.as_device_opt().vortex_expect("expected device buffer")
277    }
278
279    /// Returns a host-resident copy of the data in the buffer.
280    ///
281    /// If the data was already host-resident, this is trivial.
282    ///
283    /// If the data was device-resident, data will be copied from the device to a new allocation
284    /// on the host.
285    ///
286    /// # Panics
287    ///
288    /// This function will never panic if the data is already host-resident.
289    ///
290    /// For a device-resident handle, any errors triggered by the copying from device to host will
291    /// result in a panic.
292    ///
293    /// See also: [`try_to_host`][Self::try_to_host].
294    pub fn to_host_sync(&self) -> ByteBuffer {
295        self.try_to_host_sync()
296            .vortex_expect("to_host: copy from device to host failed")
297    }
298
299    /// Returns a host-resident copy of the data behind the handle, consuming the handle.
300    ///
301    /// If the data was already host-resident, this completes trivially.
302    ///
303    /// See also [`to_host`][Self::to_host].
304    ///
305    /// # Panics
306    ///
307    /// See the panic documentation on [`to_host`][Self::to_host].
308    pub fn into_host_sync(self) -> ByteBuffer {
309        self.try_into_host_sync()
310            .vortex_expect("into_host: copy from device to host failed")
311    }
312
313    /// Attempts to load this buffer into a host-resident allocation.
314    ///
315    /// If the allocation is already host-resident, this trivially completes with success.
316    ///
317    /// If it is a device allocation, then this issues an operation that attempts to copy the data
318    /// from the device into a host-resident buffer, and returns a handle to that buffer.
319    pub fn try_to_host_sync(&self) -> VortexResult<ByteBuffer> {
320        match &self.0 {
321            Inner::Host(b) => Ok(b.clone()),
322            Inner::Device(device) => device.copy_to_host_sync(ALIGNMENT_TO_HOST_COPY),
323        }
324    }
325
326    /// Attempts to load this buffer into a host-resident allocation, consuming the handle.
327    ///
328    /// See also [`try_to_host`][Self::try_to_host].
329    pub fn try_into_host_sync(self) -> VortexResult<ByteBuffer> {
330        match self.0 {
331            Inner::Host(b) => Ok(b),
332            Inner::Device(device) => device.copy_to_host_sync(ALIGNMENT_TO_HOST_COPY),
333        }
334    }
335
336    /// Asynchronously copies the buffer to the host.
337    ///
338    /// This is a no-op if the buffer is already on the host.
339    ///
340    /// # Returns
341    ///
342    /// A future that resolves to the host buffer when the copy completes.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if the async copy operation fails.
347    pub fn try_to_host(&self) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>> {
348        match &self.0 {
349            Inner::Host(b) => {
350                let buffer = b.clone();
351                Ok(Box::pin(async move { Ok(buffer) }))
352            }
353            Inner::Device(device) => device.copy_to_host(ALIGNMENT_TO_HOST_COPY),
354        }
355    }
356
357    /// Asynchronously copies the buffer to the host, consuming the handle.
358    ///
359    /// This is a no-op if the buffer is already on the host.
360    ///
361    /// # Returns
362    ///
363    /// A future that resolves to the host buffer when the copy completes.
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if the async copy operation fails.
368    pub fn try_into_host(self) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>> {
369        match self.0 {
370            Inner::Host(b) => Ok(Box::pin(async move { Ok(b) })),
371            Inner::Device(device) => device.copy_to_host(ALIGNMENT_TO_HOST_COPY),
372        }
373    }
374
375    /// Asynchronously copies the buffer to the host.
376    ///
377    /// # Panics
378    ///
379    /// Any errors triggered by the copying from device to host will result in a panic.
380    pub fn to_host(&self) -> BoxFuture<'static, ByteBuffer> {
381        let future = self
382            .try_to_host()
383            .vortex_expect("to_host: failed to initiate copy from device to host");
384        Box::pin(async move {
385            future
386                .await
387                .vortex_expect("to_host: copy from device to host failed")
388        })
389    }
390
391    /// Asynchronously copies the buffer to the host, consuming the handle.
392    ///
393    /// # Panics
394    ///
395    /// Any errors triggered by the copying from device to host will result in a panic.
396    pub fn into_host(self) -> BoxFuture<'static, ByteBuffer> {
397        let future = self
398            .try_into_host()
399            .vortex_expect("into_host: failed to initiate copy from device to host");
400        Box::pin(async move {
401            future
402                .await
403                .vortex_expect("into_host: copy from device to host failed")
404        })
405    }
406}
407
408impl ArrayHash for BufferHandle {
409    // TODO(aduffy): implement for array hash
410    fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
411        match &self.0 {
412            Inner::Host(host) => host.array_hash(state, precision),
413            Inner::Device(dev) => match precision {
414                Precision::Ptr => {
415                    Arc::as_ptr(dev).hash(state);
416                }
417                Precision::Value => {
418                    dev.hash(state);
419                }
420            },
421        }
422    }
423}
424
425impl ArrayEq for BufferHandle {
426    fn array_eq(&self, other: &Self, precision: Precision) -> bool {
427        match (&self.0, &other.0) {
428            (Inner::Host(b), Inner::Host(b2)) => b.array_eq(b2, precision),
429            (Inner::Device(b), Inner::Device(b2)) => match precision {
430                Precision::Ptr => Arc::ptr_eq(b, b2),
431                Precision::Value => b.eq(b2),
432            },
433            _ => false,
434        }
435    }
436}