vortex_array/buffer.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::hash::Hasher;
8use std::ops::Range;
9use std::sync::Arc;
10
11use futures::future::BoxFuture;
12use vortex_buffer::Alignment;
13use vortex_buffer::ByteBuffer;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_utils::dyn_traits::DynEq;
17use vortex_utils::dyn_traits::DynHash;
18
19use crate::ArrayEq;
20use crate::ArrayHash;
21use crate::EqMode;
22
23/// A handle to a buffer allocation.
24///
25/// There are two kinds of buffer allocations supported:
26///
27/// * **host** allocations, which were allocated by the global allocator and reside in main memory
28/// * **device** allocations, which are remote to the CPU and live on a GPU or other external
29/// device.
30///
31/// A device allocation can be copied to the host, yielding a new [`ByteBuffer`] containing the
32/// copied data. Copying can fail at runtime, error recovery is system-dependent.
33#[derive(Debug, Clone)]
34pub struct BufferHandle(Inner);
35
36#[derive(Debug, Clone)]
37enum Inner {
38 /// On the host/cpu.
39 Host(ByteBuffer),
40 /// On the device/gpu.
41 Device(Arc<dyn DeviceBuffer>),
42}
43
44/// A buffer that is stored on the GPU.
45pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash {
46 /// Returns a reference as `Any` to enable downcasting.
47 fn as_any(&self) -> &dyn Any;
48
49 /// Returns the length of the buffer in bytes.
50 fn len(&self) -> usize;
51
52 /// Returns the alignment of the buffer.
53 fn alignment(&self) -> Alignment;
54
55 /// Returns true if the buffer is empty.
56 fn is_empty(&self) -> bool {
57 self.len() == 0
58 }
59
60 /// Attempts to copy the device buffer to a host ByteBuffer.
61 ///
62 /// # Errors
63 ///
64 /// This operation may fail, depending on the device implementation and the underlying hardware.
65 fn copy_to_host_sync(&self, alignment: Alignment) -> VortexResult<ByteBuffer>;
66
67 /// Copies the device buffer to a host buffer asynchronously.
68 ///
69 /// Schedules an async copy and returns a future that completes when the copy is finished.
70 ///
71 /// # Arguments
72 ///
73 /// * `alignment` - The memory alignment to use for the host buffer.
74 ///
75 /// # Errors
76 ///
77 /// Returns an error if the async copy operation fails.
78 fn copy_to_host(
79 &self,
80 alignment: Alignment,
81 ) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>>;
82
83 /// Create a new buffer that references a subrange of this buffer at the given
84 /// slice indices.
85 ///
86 /// Note that slice indices are in byte units.
87 fn slice(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;
88
89 /// Return a buffer with the given alignment. Where possible, this will be zero-copy.
90 ///
91 /// # Errors
92 ///
93 /// Returns an error if the buffer cannot be aligned (e.g., allocation or copy failure).
94 fn aligned(self: Arc<Self>, alignment: Alignment) -> VortexResult<Arc<dyn DeviceBuffer>>;
95}
96
97pub trait DeviceBufferExt: DeviceBuffer {
98 /// Slice a range of elements `T` out of the device buffer.
99 fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;
100}
101
102impl<B: DeviceBuffer> DeviceBufferExt for B {
103 fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer> {
104 let start_bytes = range.start * size_of::<T>();
105 let end_bytes = range.end * size_of::<T>();
106 self.slice(start_bytes..end_bytes)
107 }
108}
109
110impl Hash for dyn DeviceBuffer {
111 fn hash<H: Hasher>(&self, state: &mut H) {
112 self.dyn_hash(state);
113 }
114}
115
116impl PartialEq for dyn DeviceBuffer {
117 fn eq(&self, other: &Self) -> bool {
118 self.dyn_eq(other)
119 }
120}
121impl Eq for dyn DeviceBuffer {}
122
123impl BufferHandle {
124 /// Create a new handle to a host [`ByteBuffer`].
125 pub fn new_host(byte_buffer: ByteBuffer) -> Self {
126 BufferHandle(Inner::Host(byte_buffer))
127 }
128
129 /// Create a new handle to a memory allocation that exists on an external device.
130 ///
131 /// Allocations on external devices are not cheaply accessible from the CPU and most be copied
132 /// into new memory when we read them.
133 pub fn new_device(device: Arc<dyn DeviceBuffer>) -> Self {
134 BufferHandle(Inner::Device(device))
135 }
136}
137
138impl BufferHandle {
139 /// Returns `true` if this buffer resides on the device (GPU).
140 pub fn is_on_device(&self) -> bool {
141 matches!(&self.0, Inner::Device(_))
142 }
143
144 /// Returns `true` if this buffer resides on the host (CPU).
145 pub fn is_on_host(&self) -> bool {
146 matches!(&self.0, Inner::Host(_))
147 }
148
149 /// Gets the size of the buffer, in bytes.
150 pub fn len(&self) -> usize {
151 match &self.0 {
152 Inner::Host(bytes) => bytes.len(),
153 Inner::Device(device) => device.len(),
154 }
155 }
156
157 /// Returns the alignment of the buffer.
158 pub fn alignment(&self) -> Alignment {
159 match &self.0 {
160 Inner::Host(bytes) => bytes.alignment(),
161 Inner::Device(device) => device.alignment(),
162 }
163 }
164
165 /// Returns true if the buffer is aligned to the given alignment.
166 pub fn is_aligned_to(&self, alignment: Alignment) -> bool {
167 self.alignment().is_aligned_to(alignment)
168 }
169
170 /// Ensure the buffer satisfies the requested alignment.
171 ///
172 /// Both host and device buffers will be copied if necessary to satisfy the alignment.
173 pub fn ensure_aligned(self, alignment: Alignment) -> VortexResult<Self> {
174 match self.0 {
175 Inner::Host(buffer) => Ok(BufferHandle::new_host(buffer.aligned(alignment))),
176 Inner::Device(device) => Ok(BufferHandle::new_device(device.aligned(alignment)?)),
177 }
178 }
179
180 /// Check if the buffer is empty.
181 pub fn is_empty(&self) -> bool {
182 self.len() == 0
183 }
184
185 /// Creates a new handle to a subrange of memory at the given bind range.
186 ///
187 ///
188 /// # Example
189 ///
190 /// ```
191 /// # use vortex_array::buffer::BufferHandle;
192 /// # use vortex_buffer::buffer;
193 /// let handle1 = BufferHandle::new_host(buffer![1u8,2,3,4]);
194 /// let handle2 = handle1.slice(1..4);
195 /// assert_eq!(handle2.unwrap_host(), buffer![2u8,3,4]);
196 /// ```
197 pub fn slice(&self, range: Range<usize>) -> Self {
198 match &self.0 {
199 Inner::Host(host) => BufferHandle::new_host(host.slice(range)),
200 Inner::Device(device) => BufferHandle::new_device(device.slice(range)),
201 }
202 }
203
204 /// Reinterpret the pointee as a buffer of `T` and slice the provided element range.
205 ///
206 /// # Example
207 ///
208 /// ```
209 /// # use vortex_array::buffer::BufferHandle;
210 /// # use vortex_buffer::{buffer, Buffer};
211 /// let values = buffer![1u32, 2u32, 3u32, 4u32];
212 /// let handle = BufferHandle::new_host(values.into_byte_buffer());
213 /// let sliced = handle.slice_typed::<u32>(1..4);
214 /// let result = Buffer::<u32>::from_byte_buffer(sliced.to_host_sync());
215 /// assert_eq!(result, buffer![2, 3, 4]);
216 /// ```
217 pub fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Self {
218 let start = range.start * size_of::<T>();
219 let end = range.end * size_of::<T>();
220
221 self.slice(start..end)
222 }
223
224 #[expect(clippy::panic)]
225 /// Unwraps the handle as host memory.
226 ///
227 /// # Panics
228 ///
229 /// This will panic if the handle points to device memory.
230 pub fn unwrap_host(self) -> ByteBuffer {
231 match self.0 {
232 Inner::Host(b) => b,
233 Inner::Device(_) => panic!("unwrap_host called for Device allocation"),
234 }
235 }
236
237 #[expect(clippy::panic)]
238 /// Unwraps the handle as device memory.
239 ///
240 /// # Panics
241 ///
242 /// This will panic if the handle points to host memory.
243 pub fn unwrap_device(self) -> Arc<dyn DeviceBuffer> {
244 match self.0 {
245 Inner::Device(b) => b,
246 Inner::Host(_) => panic!("unwrap_device called for Host allocation"),
247 }
248 }
249
250 /// Downcast this handle as a handle to a host-resident buffer, or `None`.
251 #[inline]
252 pub fn as_host_opt(&self) -> Option<&ByteBuffer> {
253 match &self.0 {
254 Inner::Host(buffer) => Some(buffer),
255 Inner::Device(_) => None,
256 }
257 }
258
259 /// Downcast this handle as a handle to a device buffer, or `None`.
260 pub fn as_device_opt(&self) -> Option<&Arc<dyn DeviceBuffer>> {
261 match &self.0 {
262 Inner::Host(_) => None,
263 Inner::Device(device) => Some(device),
264 }
265 }
266
267 /// A version of [`as_host_opt`][Self::as_host_opt] that panics if the allocation is
268 /// not a host allocation.
269 #[inline]
270 pub fn as_host(&self) -> &ByteBuffer {
271 self.as_host_opt().vortex_expect("expected host buffer")
272 }
273
274 /// A version of [`as_device_opt`][Self::as_device_opt] that panics if the allocation is
275 /// not a device allocation.
276 pub fn as_device(&self) -> &Arc<dyn DeviceBuffer> {
277 self.as_device_opt().vortex_expect("expected device buffer")
278 }
279
280 /// Returns a host-resident copy of the data in the buffer.
281 ///
282 /// If the data was already host-resident, this is trivial.
283 ///
284 /// If the data was device-resident, data will be copied from the device to a new allocation
285 /// on the host.
286 ///
287 /// # Panics
288 ///
289 /// This function will never panic if the data is already host-resident.
290 ///
291 /// For a device-resident handle, any errors triggered by the copying from device to host will
292 /// result in a panic.
293 ///
294 /// See also: [`try_to_host`][Self::try_to_host].
295 pub fn to_host_sync(&self) -> ByteBuffer {
296 self.try_to_host_sync()
297 .vortex_expect("to_host: copy from device to host failed")
298 }
299
300 /// Returns a host-resident copy of the data behind the handle, consuming the handle.
301 ///
302 /// If the data was already host-resident, this completes trivially.
303 ///
304 /// See also [`to_host`][Self::to_host].
305 ///
306 /// # Panics
307 ///
308 /// See the panic documentation on [`to_host`][Self::to_host].
309 pub fn into_host_sync(self) -> ByteBuffer {
310 self.try_into_host_sync()
311 .vortex_expect("into_host: copy from device to host failed")
312 }
313
314 /// Attempts to load this buffer into a host-resident allocation.
315 ///
316 /// If the allocation is already host-resident, this trivially completes with success.
317 ///
318 /// If it is a device allocation, then this issues an operation that attempts to copy the data
319 /// from the device into a host-resident buffer, and returns a handle to that buffer.
320 pub fn try_to_host_sync(&self) -> VortexResult<ByteBuffer> {
321 match &self.0 {
322 Inner::Host(b) => Ok(b.clone()),
323 Inner::Device(device) => device.copy_to_host_sync(Alignment::HOST_COPY),
324 }
325 }
326
327 /// Attempts to load this buffer into a host-resident allocation, consuming the handle.
328 ///
329 /// See also [`try_to_host`][Self::try_to_host].
330 pub fn try_into_host_sync(self) -> VortexResult<ByteBuffer> {
331 match self.0 {
332 Inner::Host(b) => Ok(b),
333 Inner::Device(device) => device.copy_to_host_sync(Alignment::HOST_COPY),
334 }
335 }
336
337 /// Asynchronously copies the buffer to the host.
338 ///
339 /// This is a no-op if the buffer is already on the host.
340 ///
341 /// # Returns
342 ///
343 /// A future that resolves to the host buffer when the copy completes.
344 ///
345 /// # Errors
346 ///
347 /// Returns an error if the async copy operation fails.
348 pub fn try_to_host(&self) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>> {
349 match &self.0 {
350 Inner::Host(b) => {
351 let buffer = b.clone();
352 Ok(Box::pin(async move { Ok(buffer) }))
353 }
354 Inner::Device(device) => device.copy_to_host(Alignment::HOST_COPY),
355 }
356 }
357
358 /// Asynchronously copies the buffer to the host, consuming the handle.
359 ///
360 /// This is a no-op if the buffer is already on the host.
361 ///
362 /// # Returns
363 ///
364 /// A future that resolves to the host buffer when the copy completes.
365 ///
366 /// # Errors
367 ///
368 /// Returns an error if the async copy operation fails.
369 pub fn try_into_host(self) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>> {
370 match self.0 {
371 Inner::Host(b) => Ok(Box::pin(async move { Ok(b) })),
372 Inner::Device(device) => device.copy_to_host(Alignment::HOST_COPY),
373 }
374 }
375
376 /// Asynchronously copies the buffer to the host.
377 ///
378 /// # Panics
379 ///
380 /// Any errors triggered by the copying from device to host will result in a panic.
381 pub fn to_host(&self) -> BoxFuture<'static, ByteBuffer> {
382 let future = self
383 .try_to_host()
384 .vortex_expect("to_host: failed to initiate copy from device to host");
385 Box::pin(async move {
386 future
387 .await
388 .vortex_expect("to_host: copy from device to host failed")
389 })
390 }
391
392 /// Asynchronously copies the buffer to the host, consuming the handle.
393 ///
394 /// # Panics
395 ///
396 /// Any errors triggered by the copying from device to host will result in a panic.
397 pub fn into_host(self) -> BoxFuture<'static, ByteBuffer> {
398 let future = self
399 .try_into_host()
400 .vortex_expect("into_host: failed to initiate copy from device to host");
401 Box::pin(async move {
402 future
403 .await
404 .vortex_expect("into_host: copy from device to host failed")
405 })
406 }
407}
408
409impl ArrayHash for BufferHandle {
410 // TODO(aduffy): implement for array hash
411 fn array_hash<H: Hasher>(&self, state: &mut H, accuracy: EqMode) {
412 match &self.0 {
413 Inner::Host(host) => host.array_hash(state, accuracy),
414 Inner::Device(dev) => match accuracy {
415 EqMode::Ptr => {
416 Arc::as_ptr(dev).hash(state);
417 }
418 EqMode::Value => {
419 dev.hash(state);
420 }
421 },
422 }
423 }
424}
425
426impl ArrayEq for BufferHandle {
427 fn array_eq(&self, other: &Self, accuracy: EqMode) -> bool {
428 match (&self.0, &other.0) {
429 (Inner::Host(b), Inner::Host(b2)) => b.array_eq(b2, accuracy),
430 (Inner::Device(b), Inner::Device(b2)) => match accuracy {
431 EqMode::Ptr => Arc::ptr_eq(b, b2),
432 EqMode::Value => b.eq(b2),
433 },
434 _ => false,
435 }
436 }
437}