Skip to main content

vortex_array/
buffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::hash::Hasher;
8use std::ops::Range;
9use std::sync::Arc;
10
11use futures::future::BoxFuture;
12use vortex_buffer::Alignment;
13use vortex_buffer::ByteBuffer;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_utils::dyn_traits::DynEq;
17use vortex_utils::dyn_traits::DynHash;
18
19use crate::ArrayEq;
20use crate::ArrayHash;
21use crate::EqMode;
22
23/// A handle to a buffer allocation.
24///
25/// There are two kinds of buffer allocations supported:
26///
27///   * **host** allocations, which were allocated by the global allocator and reside in main memory
28///   * **device** allocations, which are remote to the CPU and live on a GPU or other external
29///     device.
30///
31/// A device allocation can be copied to the host, yielding a new [`ByteBuffer`] containing the
32/// copied data. Copying can fail at runtime, error recovery is system-dependent.
33#[derive(Debug, Clone)]
34pub struct BufferHandle(Inner);
35
36#[derive(Debug, Clone)]
37enum Inner {
38    /// On the host/cpu.
39    Host(ByteBuffer),
40    /// On the device/gpu.
41    Device(Arc<dyn DeviceBuffer>),
42}
43
44/// A buffer that is stored on the GPU.
45pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash {
46    /// Returns a reference as `Any` to enable downcasting.
47    fn as_any(&self) -> &dyn Any;
48
49    /// Returns the length of the buffer in bytes.
50    fn len(&self) -> usize;
51
52    /// Returns the alignment of the buffer.
53    fn alignment(&self) -> Alignment;
54
55    /// Returns true if the buffer is empty.
56    fn is_empty(&self) -> bool {
57        self.len() == 0
58    }
59
60    /// Attempts to copy the device buffer to a host ByteBuffer.
61    ///
62    /// # Errors
63    ///
64    /// This operation may fail, depending on the device implementation and the underlying hardware.
65    fn copy_to_host_sync(&self, alignment: Alignment) -> VortexResult<ByteBuffer>;
66
67    /// Copies the device buffer to a host buffer asynchronously.
68    ///
69    /// Schedules an async copy and returns a future that completes when the copy is finished.
70    ///
71    /// # Arguments
72    ///
73    /// * `alignment` - The memory alignment to use for the host buffer.
74    ///
75    /// # Errors
76    ///
77    /// Returns an error if the async copy operation fails.
78    fn copy_to_host(
79        &self,
80        alignment: Alignment,
81    ) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>>;
82
83    /// Create a new buffer that references a subrange of this buffer at the given
84    /// slice indices.
85    ///
86    /// Note that slice indices are in byte units.
87    fn slice(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;
88
89    /// Return a buffer with the given alignment. Where possible, this will be zero-copy.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the buffer cannot be aligned (e.g., allocation or copy failure).
94    fn aligned(self: Arc<Self>, alignment: Alignment) -> VortexResult<Arc<dyn DeviceBuffer>>;
95}
96
97pub trait DeviceBufferExt: DeviceBuffer {
98    /// Slice a range of elements `T` out of the device buffer.
99    fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;
100}
101
102impl<B: DeviceBuffer> DeviceBufferExt for B {
103    fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer> {
104        let start_bytes = range.start * size_of::<T>();
105        let end_bytes = range.end * size_of::<T>();
106        self.slice(start_bytes..end_bytes)
107    }
108}
109
110impl Hash for dyn DeviceBuffer {
111    fn hash<H: Hasher>(&self, state: &mut H) {
112        self.dyn_hash(state);
113    }
114}
115
116impl PartialEq for dyn DeviceBuffer {
117    fn eq(&self, other: &Self) -> bool {
118        self.dyn_eq(other)
119    }
120}
121impl Eq for dyn DeviceBuffer {}
122
123impl BufferHandle {
124    /// Create a new handle to a host [`ByteBuffer`].
125    pub fn new_host(byte_buffer: ByteBuffer) -> Self {
126        BufferHandle(Inner::Host(byte_buffer))
127    }
128
129    /// Create a new handle to a memory allocation that exists on an external device.
130    ///
131    /// Allocations on external devices are not cheaply accessible from the CPU and most be copied
132    /// into new memory when we read them.
133    pub fn new_device(device: Arc<dyn DeviceBuffer>) -> Self {
134        BufferHandle(Inner::Device(device))
135    }
136}
137
138impl BufferHandle {
139    /// Returns `true` if this buffer resides on the device (GPU).
140    pub fn is_on_device(&self) -> bool {
141        matches!(&self.0, Inner::Device(_))
142    }
143
144    /// Returns `true` if this buffer resides on the host (CPU).
145    pub fn is_on_host(&self) -> bool {
146        matches!(&self.0, Inner::Host(_))
147    }
148
149    /// Gets the size of the buffer, in bytes.
150    pub fn len(&self) -> usize {
151        match &self.0 {
152            Inner::Host(bytes) => bytes.len(),
153            Inner::Device(device) => device.len(),
154        }
155    }
156
157    /// Returns the alignment of the buffer.
158    pub fn alignment(&self) -> Alignment {
159        match &self.0 {
160            Inner::Host(bytes) => bytes.alignment(),
161            Inner::Device(device) => device.alignment(),
162        }
163    }
164
165    /// Returns true if the buffer is aligned to the given alignment.
166    pub fn is_aligned_to(&self, alignment: Alignment) -> bool {
167        self.alignment().is_aligned_to(alignment)
168    }
169
170    /// Ensure the buffer satisfies the requested alignment.
171    ///
172    /// Both host and device buffers will be copied if necessary to satisfy the alignment.
173    pub fn ensure_aligned(self, alignment: Alignment) -> VortexResult<Self> {
174        match self.0 {
175            Inner::Host(buffer) => Ok(BufferHandle::new_host(buffer.aligned(alignment))),
176            Inner::Device(device) => Ok(BufferHandle::new_device(device.aligned(alignment)?)),
177        }
178    }
179
180    /// Check if the buffer is empty.
181    pub fn is_empty(&self) -> bool {
182        self.len() == 0
183    }
184
185    /// Creates a new handle to a subrange of memory at the given bind range.
186    ///
187    ///
188    /// # Example
189    ///
190    /// ```
191    /// # use vortex_array::buffer::BufferHandle;
192    /// # use vortex_buffer::buffer;
193    /// let handle1 = BufferHandle::new_host(buffer![1u8,2,3,4]);
194    /// let handle2 = handle1.slice(1..4);
195    /// assert_eq!(handle2.unwrap_host(), buffer![2u8,3,4]);
196    /// ```
197    pub fn slice(&self, range: Range<usize>) -> Self {
198        match &self.0 {
199            Inner::Host(host) => BufferHandle::new_host(host.slice(range)),
200            Inner::Device(device) => BufferHandle::new_device(device.slice(range)),
201        }
202    }
203
204    /// Reinterpret the pointee as a buffer of `T` and slice the provided element range.
205    ///
206    /// # Example
207    ///
208    /// ```
209    /// # use vortex_array::buffer::BufferHandle;
210    /// # use vortex_buffer::{buffer, Buffer};
211    /// let values = buffer![1u32, 2u32, 3u32, 4u32];
212    /// let handle = BufferHandle::new_host(values.into_byte_buffer());
213    /// let sliced = handle.slice_typed::<u32>(1..4);
214    /// let result = Buffer::<u32>::from_byte_buffer(sliced.to_host_sync());
215    /// assert_eq!(result, buffer![2, 3, 4]);
216    /// ```
217    pub fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Self {
218        let start = range.start * size_of::<T>();
219        let end = range.end * size_of::<T>();
220
221        self.slice(start..end)
222    }
223
224    #[expect(clippy::panic)]
225    /// Unwraps the handle as host memory.
226    ///
227    /// # Panics
228    ///
229    /// This will panic if the handle points to device memory.
230    pub fn unwrap_host(self) -> ByteBuffer {
231        match self.0 {
232            Inner::Host(b) => b,
233            Inner::Device(_) => panic!("unwrap_host called for Device allocation"),
234        }
235    }
236
237    #[expect(clippy::panic)]
238    /// Unwraps the handle as device memory.
239    ///
240    /// # Panics
241    ///
242    /// This will panic if the handle points to host memory.
243    pub fn unwrap_device(self) -> Arc<dyn DeviceBuffer> {
244        match self.0 {
245            Inner::Device(b) => b,
246            Inner::Host(_) => panic!("unwrap_device called for Host allocation"),
247        }
248    }
249
250    /// Downcast this handle as a handle to a host-resident buffer, or `None`.
251    #[inline]
252    pub fn as_host_opt(&self) -> Option<&ByteBuffer> {
253        match &self.0 {
254            Inner::Host(buffer) => Some(buffer),
255            Inner::Device(_) => None,
256        }
257    }
258
259    /// Downcast this handle as a handle to a device buffer, or `None`.
260    pub fn as_device_opt(&self) -> Option<&Arc<dyn DeviceBuffer>> {
261        match &self.0 {
262            Inner::Host(_) => None,
263            Inner::Device(device) => Some(device),
264        }
265    }
266
267    /// A version of [`as_host_opt`][Self::as_host_opt] that panics if the allocation is
268    /// not a host allocation.
269    #[inline]
270    pub fn as_host(&self) -> &ByteBuffer {
271        self.as_host_opt().vortex_expect("expected host buffer")
272    }
273
274    /// A version of [`as_device_opt`][Self::as_device_opt] that panics if the allocation is
275    /// not a device allocation.
276    pub fn as_device(&self) -> &Arc<dyn DeviceBuffer> {
277        self.as_device_opt().vortex_expect("expected device buffer")
278    }
279
280    /// Returns a host-resident copy of the data in the buffer.
281    ///
282    /// If the data was already host-resident, this is trivial.
283    ///
284    /// If the data was device-resident, data will be copied from the device to a new allocation
285    /// on the host.
286    ///
287    /// # Panics
288    ///
289    /// This function will never panic if the data is already host-resident.
290    ///
291    /// For a device-resident handle, any errors triggered by the copying from device to host will
292    /// result in a panic.
293    ///
294    /// See also: [`try_to_host`][Self::try_to_host].
295    pub fn to_host_sync(&self) -> ByteBuffer {
296        self.try_to_host_sync()
297            .vortex_expect("to_host: copy from device to host failed")
298    }
299
300    /// Returns a host-resident copy of the data behind the handle, consuming the handle.
301    ///
302    /// If the data was already host-resident, this completes trivially.
303    ///
304    /// See also [`to_host`][Self::to_host].
305    ///
306    /// # Panics
307    ///
308    /// See the panic documentation on [`to_host`][Self::to_host].
309    pub fn into_host_sync(self) -> ByteBuffer {
310        self.try_into_host_sync()
311            .vortex_expect("into_host: copy from device to host failed")
312    }
313
314    /// Attempts to load this buffer into a host-resident allocation.
315    ///
316    /// If the allocation is already host-resident, this trivially completes with success.
317    ///
318    /// If it is a device allocation, then this issues an operation that attempts to copy the data
319    /// from the device into a host-resident buffer, and returns a handle to that buffer.
320    pub fn try_to_host_sync(&self) -> VortexResult<ByteBuffer> {
321        match &self.0 {
322            Inner::Host(b) => Ok(b.clone()),
323            Inner::Device(device) => device.copy_to_host_sync(Alignment::HOST_COPY),
324        }
325    }
326
327    /// Attempts to load this buffer into a host-resident allocation, consuming the handle.
328    ///
329    /// See also [`try_to_host`][Self::try_to_host].
330    pub fn try_into_host_sync(self) -> VortexResult<ByteBuffer> {
331        match self.0 {
332            Inner::Host(b) => Ok(b),
333            Inner::Device(device) => device.copy_to_host_sync(Alignment::HOST_COPY),
334        }
335    }
336
337    /// Asynchronously copies the buffer to the host.
338    ///
339    /// This is a no-op if the buffer is already on the host.
340    ///
341    /// # Returns
342    ///
343    /// A future that resolves to the host buffer when the copy completes.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if the async copy operation fails.
348    pub fn try_to_host(&self) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>> {
349        match &self.0 {
350            Inner::Host(b) => {
351                let buffer = b.clone();
352                Ok(Box::pin(async move { Ok(buffer) }))
353            }
354            Inner::Device(device) => device.copy_to_host(Alignment::HOST_COPY),
355        }
356    }
357
358    /// Asynchronously copies the buffer to the host, consuming the handle.
359    ///
360    /// This is a no-op if the buffer is already on the host.
361    ///
362    /// # Returns
363    ///
364    /// A future that resolves to the host buffer when the copy completes.
365    ///
366    /// # Errors
367    ///
368    /// Returns an error if the async copy operation fails.
369    pub fn try_into_host(self) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>> {
370        match self.0 {
371            Inner::Host(b) => Ok(Box::pin(async move { Ok(b) })),
372            Inner::Device(device) => device.copy_to_host(Alignment::HOST_COPY),
373        }
374    }
375
376    /// Asynchronously copies the buffer to the host.
377    ///
378    /// # Panics
379    ///
380    /// Any errors triggered by the copying from device to host will result in a panic.
381    pub fn to_host(&self) -> BoxFuture<'static, ByteBuffer> {
382        let future = self
383            .try_to_host()
384            .vortex_expect("to_host: failed to initiate copy from device to host");
385        Box::pin(async move {
386            future
387                .await
388                .vortex_expect("to_host: copy from device to host failed")
389        })
390    }
391
392    /// Asynchronously copies the buffer to the host, consuming the handle.
393    ///
394    /// # Panics
395    ///
396    /// Any errors triggered by the copying from device to host will result in a panic.
397    pub fn into_host(self) -> BoxFuture<'static, ByteBuffer> {
398        let future = self
399            .try_into_host()
400            .vortex_expect("into_host: failed to initiate copy from device to host");
401        Box::pin(async move {
402            future
403                .await
404                .vortex_expect("into_host: copy from device to host failed")
405        })
406    }
407}
408
409impl ArrayHash for BufferHandle {
410    // TODO(aduffy): implement for array hash
411    fn array_hash<H: Hasher>(&self, state: &mut H, accuracy: EqMode) {
412        match &self.0 {
413            Inner::Host(host) => host.array_hash(state, accuracy),
414            Inner::Device(dev) => match accuracy {
415                EqMode::Ptr => {
416                    Arc::as_ptr(dev).hash(state);
417                }
418                EqMode::Value => {
419                    dev.hash(state);
420                }
421            },
422        }
423    }
424}
425
426impl ArrayEq for BufferHandle {
427    fn array_eq(&self, other: &Self, accuracy: EqMode) -> bool {
428        match (&self.0, &other.0) {
429            (Inner::Host(b), Inner::Host(b2)) => b.array_eq(b2, accuracy),
430            (Inner::Device(b), Inner::Device(b2)) => match accuracy {
431                EqMode::Ptr => Arc::ptr_eq(b, b2),
432                EqMode::Value => b.eq(b2),
433            },
434            _ => false,
435        }
436    }
437}