Skip to main content

vortex_array/
buffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::hash::Hasher;
8use std::ops::Range;
9use std::sync::Arc;
10
11use futures::future::BoxFuture;
12use vortex_buffer::ALIGNMENT_TO_HOST_COPY;
13use vortex_buffer::Alignment;
14use vortex_buffer::ByteBuffer;
15use vortex_error::VortexExpect;
16use vortex_error::VortexResult;
17use vortex_utils::dyn_traits::DynEq;
18use vortex_utils::dyn_traits::DynHash;
19
20use crate::ArrayEq;
21use crate::ArrayHash;
22use crate::EqMode;
23
24/// A handle to a buffer allocation.
25///
26/// There are two kinds of buffer allocations supported:
27///
28///   * **host** allocations, which were allocated by the global allocator and reside in main memory
29///   * **device** allocations, which are remote to the CPU and live on a GPU or other external
30///     device.
31///
32/// A device allocation can be copied to the host, yielding a new [`ByteBuffer`] containing the
33/// copied data. Copying can fail at runtime, error recovery is system-dependent.
34#[derive(Debug, Clone)]
35pub struct BufferHandle(Inner);
36
37#[derive(Debug, Clone)]
38enum Inner {
39    /// On the host/cpu.
40    Host(ByteBuffer),
41    /// On the device/gpu.
42    Device(Arc<dyn DeviceBuffer>),
43}
44
45/// A buffer that is stored on the GPU.
46pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash {
47    /// Returns a reference as `Any` to enable downcasting.
48    fn as_any(&self) -> &dyn Any;
49
50    /// Returns the length of the buffer in bytes.
51    fn len(&self) -> usize;
52
53    /// Returns the alignment of the buffer.
54    fn alignment(&self) -> Alignment;
55
56    /// Returns true if the buffer is empty.
57    fn is_empty(&self) -> bool {
58        self.len() == 0
59    }
60
61    /// Attempts to copy the device buffer to a host ByteBuffer.
62    ///
63    /// # Errors
64    ///
65    /// This operation may fail, depending on the device implementation and the underlying hardware.
66    fn copy_to_host_sync(&self, alignment: Alignment) -> VortexResult<ByteBuffer>;
67
68    /// Copies the device buffer to a host buffer asynchronously.
69    ///
70    /// Schedules an async copy and returns a future that completes when the copy is finished.
71    ///
72    /// # Arguments
73    ///
74    /// * `alignment` - The memory alignment to use for the host buffer.
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if the async copy operation fails.
79    fn copy_to_host(
80        &self,
81        alignment: Alignment,
82    ) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>>;
83
84    /// Create a new buffer that references a subrange of this buffer at the given
85    /// slice indices.
86    ///
87    /// Note that slice indices are in byte units.
88    fn slice(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;
89
90    /// Return a buffer with the given alignment. Where possible, this will be zero-copy.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the buffer cannot be aligned (e.g., allocation or copy failure).
95    fn aligned(self: Arc<Self>, alignment: Alignment) -> VortexResult<Arc<dyn DeviceBuffer>>;
96}
97
98pub trait DeviceBufferExt: DeviceBuffer {
99    /// Slice a range of elements `T` out of the device buffer.
100    fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;
101}
102
103impl<B: DeviceBuffer> DeviceBufferExt for B {
104    fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer> {
105        let start_bytes = range.start * size_of::<T>();
106        let end_bytes = range.end * size_of::<T>();
107        self.slice(start_bytes..end_bytes)
108    }
109}
110
111impl Hash for dyn DeviceBuffer {
112    fn hash<H: Hasher>(&self, state: &mut H) {
113        self.dyn_hash(state);
114    }
115}
116
117impl PartialEq for dyn DeviceBuffer {
118    fn eq(&self, other: &Self) -> bool {
119        self.dyn_eq(other)
120    }
121}
122impl Eq for dyn DeviceBuffer {}
123
124impl BufferHandle {
125    /// Create a new handle to a host [`ByteBuffer`].
126    pub fn new_host(byte_buffer: ByteBuffer) -> Self {
127        BufferHandle(Inner::Host(byte_buffer))
128    }
129
130    /// Create a new handle to a memory allocation that exists on an external device.
131    ///
132    /// Allocations on external devices are not cheaply accessible from the CPU and most be copied
133    /// into new memory when we read them.
134    pub fn new_device(device: Arc<dyn DeviceBuffer>) -> Self {
135        BufferHandle(Inner::Device(device))
136    }
137}
138
139impl BufferHandle {
140    /// Returns `true` if this buffer resides on the device (GPU).
141    pub fn is_on_device(&self) -> bool {
142        matches!(&self.0, Inner::Device(_))
143    }
144
145    /// Returns `true` if this buffer resides on the host (CPU).
146    pub fn is_on_host(&self) -> bool {
147        matches!(&self.0, Inner::Host(_))
148    }
149
150    /// Gets the size of the buffer, in bytes.
151    pub fn len(&self) -> usize {
152        match &self.0 {
153            Inner::Host(bytes) => bytes.len(),
154            Inner::Device(device) => device.len(),
155        }
156    }
157
158    /// Returns the alignment of the buffer.
159    pub fn alignment(&self) -> Alignment {
160        match &self.0 {
161            Inner::Host(bytes) => bytes.alignment(),
162            Inner::Device(device) => device.alignment(),
163        }
164    }
165
166    /// Returns true if the buffer is aligned to the given alignment.
167    pub fn is_aligned_to(&self, alignment: Alignment) -> bool {
168        self.alignment().is_aligned_to(alignment)
169    }
170
171    /// Ensure the buffer satisfies the requested alignment.
172    ///
173    /// Both host and device buffers will be copied if necessary to satisfy the alignment.
174    pub fn ensure_aligned(self, alignment: Alignment) -> VortexResult<Self> {
175        match self.0 {
176            Inner::Host(buffer) => Ok(BufferHandle::new_host(buffer.aligned(alignment))),
177            Inner::Device(device) => Ok(BufferHandle::new_device(device.aligned(alignment)?)),
178        }
179    }
180
181    /// Check if the buffer is empty.
182    pub fn is_empty(&self) -> bool {
183        self.len() == 0
184    }
185
186    /// Creates a new handle to a subrange of memory at the given bind range.
187    ///
188    ///
189    /// # Example
190    ///
191    /// ```
192    /// # use vortex_array::buffer::BufferHandle;
193    /// # use vortex_buffer::buffer;
194    /// let handle1 = BufferHandle::new_host(buffer![1u8,2,3,4]);
195    /// let handle2 = handle1.slice(1..4);
196    /// assert_eq!(handle2.unwrap_host(), buffer![2u8,3,4]);
197    /// ```
198    pub fn slice(&self, range: Range<usize>) -> Self {
199        match &self.0 {
200            Inner::Host(host) => BufferHandle::new_host(host.slice(range)),
201            Inner::Device(device) => BufferHandle::new_device(device.slice(range)),
202        }
203    }
204
205    /// Reinterpret the pointee as a buffer of `T` and slice the provided element range.
206    ///
207    /// # Example
208    ///
209    /// ```
210    /// # use vortex_array::buffer::BufferHandle;
211    /// # use vortex_buffer::{buffer, Buffer};
212    /// let values = buffer![1u32, 2u32, 3u32, 4u32];
213    /// let handle = BufferHandle::new_host(values.into_byte_buffer());
214    /// let sliced = handle.slice_typed::<u32>(1..4);
215    /// let result = Buffer::<u32>::from_byte_buffer(sliced.to_host_sync());
216    /// assert_eq!(result, buffer![2, 3, 4]);
217    /// ```
218    pub fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Self {
219        let start = range.start * size_of::<T>();
220        let end = range.end * size_of::<T>();
221
222        self.slice(start..end)
223    }
224
225    #[expect(clippy::panic)]
226    /// Unwraps the handle as host memory.
227    ///
228    /// # Panics
229    ///
230    /// This will panic if the handle points to device memory.
231    pub fn unwrap_host(self) -> ByteBuffer {
232        match self.0 {
233            Inner::Host(b) => b,
234            Inner::Device(_) => panic!("unwrap_host called for Device allocation"),
235        }
236    }
237
238    #[expect(clippy::panic)]
239    /// Unwraps the handle as device memory.
240    ///
241    /// # Panics
242    ///
243    /// This will panic if the handle points to host memory.
244    pub fn unwrap_device(self) -> Arc<dyn DeviceBuffer> {
245        match self.0 {
246            Inner::Device(b) => b,
247            Inner::Host(_) => panic!("unwrap_device called for Host allocation"),
248        }
249    }
250
251    /// Downcast this handle as a handle to a host-resident buffer, or `None`.
252    #[inline]
253    pub fn as_host_opt(&self) -> Option<&ByteBuffer> {
254        match &self.0 {
255            Inner::Host(buffer) => Some(buffer),
256            Inner::Device(_) => None,
257        }
258    }
259
260    /// Downcast this handle as a handle to a device buffer, or `None`.
261    pub fn as_device_opt(&self) -> Option<&Arc<dyn DeviceBuffer>> {
262        match &self.0 {
263            Inner::Host(_) => None,
264            Inner::Device(device) => Some(device),
265        }
266    }
267
268    /// A version of [`as_host_opt`][Self::as_host_opt] that panics if the allocation is
269    /// not a host allocation.
270    #[inline]
271    pub fn as_host(&self) -> &ByteBuffer {
272        self.as_host_opt().vortex_expect("expected host buffer")
273    }
274
275    /// A version of [`as_device_opt`][Self::as_device_opt] that panics if the allocation is
276    /// not a device allocation.
277    pub fn as_device(&self) -> &Arc<dyn DeviceBuffer> {
278        self.as_device_opt().vortex_expect("expected device buffer")
279    }
280
281    /// Returns a host-resident copy of the data in the buffer.
282    ///
283    /// If the data was already host-resident, this is trivial.
284    ///
285    /// If the data was device-resident, data will be copied from the device to a new allocation
286    /// on the host.
287    ///
288    /// # Panics
289    ///
290    /// This function will never panic if the data is already host-resident.
291    ///
292    /// For a device-resident handle, any errors triggered by the copying from device to host will
293    /// result in a panic.
294    ///
295    /// See also: [`try_to_host`][Self::try_to_host].
296    pub fn to_host_sync(&self) -> ByteBuffer {
297        self.try_to_host_sync()
298            .vortex_expect("to_host: copy from device to host failed")
299    }
300
301    /// Returns a host-resident copy of the data behind the handle, consuming the handle.
302    ///
303    /// If the data was already host-resident, this completes trivially.
304    ///
305    /// See also [`to_host`][Self::to_host].
306    ///
307    /// # Panics
308    ///
309    /// See the panic documentation on [`to_host`][Self::to_host].
310    pub fn into_host_sync(self) -> ByteBuffer {
311        self.try_into_host_sync()
312            .vortex_expect("into_host: copy from device to host failed")
313    }
314
315    /// Attempts to load this buffer into a host-resident allocation.
316    ///
317    /// If the allocation is already host-resident, this trivially completes with success.
318    ///
319    /// If it is a device allocation, then this issues an operation that attempts to copy the data
320    /// from the device into a host-resident buffer, and returns a handle to that buffer.
321    pub fn try_to_host_sync(&self) -> VortexResult<ByteBuffer> {
322        match &self.0 {
323            Inner::Host(b) => Ok(b.clone()),
324            Inner::Device(device) => device.copy_to_host_sync(ALIGNMENT_TO_HOST_COPY),
325        }
326    }
327
328    /// Attempts to load this buffer into a host-resident allocation, consuming the handle.
329    ///
330    /// See also [`try_to_host`][Self::try_to_host].
331    pub fn try_into_host_sync(self) -> VortexResult<ByteBuffer> {
332        match self.0 {
333            Inner::Host(b) => Ok(b),
334            Inner::Device(device) => device.copy_to_host_sync(ALIGNMENT_TO_HOST_COPY),
335        }
336    }
337
338    /// Asynchronously copies the buffer to the host.
339    ///
340    /// This is a no-op if the buffer is already on the host.
341    ///
342    /// # Returns
343    ///
344    /// A future that resolves to the host buffer when the copy completes.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if the async copy operation fails.
349    pub fn try_to_host(&self) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>> {
350        match &self.0 {
351            Inner::Host(b) => {
352                let buffer = b.clone();
353                Ok(Box::pin(async move { Ok(buffer) }))
354            }
355            Inner::Device(device) => device.copy_to_host(ALIGNMENT_TO_HOST_COPY),
356        }
357    }
358
359    /// Asynchronously copies the buffer to the host, consuming the handle.
360    ///
361    /// This is a no-op if the buffer is already on the host.
362    ///
363    /// # Returns
364    ///
365    /// A future that resolves to the host buffer when the copy completes.
366    ///
367    /// # Errors
368    ///
369    /// Returns an error if the async copy operation fails.
370    pub fn try_into_host(self) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>> {
371        match self.0 {
372            Inner::Host(b) => Ok(Box::pin(async move { Ok(b) })),
373            Inner::Device(device) => device.copy_to_host(ALIGNMENT_TO_HOST_COPY),
374        }
375    }
376
377    /// Asynchronously copies the buffer to the host.
378    ///
379    /// # Panics
380    ///
381    /// Any errors triggered by the copying from device to host will result in a panic.
382    pub fn to_host(&self) -> BoxFuture<'static, ByteBuffer> {
383        let future = self
384            .try_to_host()
385            .vortex_expect("to_host: failed to initiate copy from device to host");
386        Box::pin(async move {
387            future
388                .await
389                .vortex_expect("to_host: copy from device to host failed")
390        })
391    }
392
393    /// Asynchronously copies the buffer to the host, consuming the handle.
394    ///
395    /// # Panics
396    ///
397    /// Any errors triggered by the copying from device to host will result in a panic.
398    pub fn into_host(self) -> BoxFuture<'static, ByteBuffer> {
399        let future = self
400            .try_into_host()
401            .vortex_expect("into_host: failed to initiate copy from device to host");
402        Box::pin(async move {
403            future
404                .await
405                .vortex_expect("into_host: copy from device to host failed")
406        })
407    }
408}
409
410impl ArrayHash for BufferHandle {
411    // TODO(aduffy): implement for array hash
412    fn array_hash<H: Hasher>(&self, state: &mut H, accuracy: EqMode) {
413        match &self.0 {
414            Inner::Host(host) => host.array_hash(state, accuracy),
415            Inner::Device(dev) => match accuracy {
416                EqMode::Ptr => {
417                    Arc::as_ptr(dev).hash(state);
418                }
419                EqMode::Value => {
420                    dev.hash(state);
421                }
422            },
423        }
424    }
425}
426
427impl ArrayEq for BufferHandle {
428    fn array_eq(&self, other: &Self, accuracy: EqMode) -> bool {
429        match (&self.0, &other.0) {
430            (Inner::Host(b), Inner::Host(b2)) => b.array_eq(b2, accuracy),
431            (Inner::Device(b), Inner::Device(b2)) => match accuracy {
432                EqMode::Ptr => Arc::ptr_eq(b, b2),
433                EqMode::Value => b.eq(b2),
434            },
435            _ => false,
436        }
437    }
438}