vortex_array/buffer.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::hash::Hasher;
8use std::ops::Range;
9use std::sync::Arc;
10
11use futures::future::BoxFuture;
12use vortex_buffer::ALIGNMENT_TO_HOST_COPY;
13use vortex_buffer::Alignment;
14use vortex_buffer::ByteBuffer;
15use vortex_error::VortexExpect;
16use vortex_error::VortexResult;
17use vortex_utils::dyn_traits::DynEq;
18use vortex_utils::dyn_traits::DynHash;
19
20use crate::ArrayEq;
21use crate::ArrayHash;
22use crate::Precision;
23
24/// A handle to a buffer allocation.
25///
26/// There are two kinds of buffer allocations supported:
27///
28/// * **host** allocations, which were allocated by the global allocator and reside in main memory
29/// * **device** allocations, which are remote to the CPU and live on a GPU or other external
30/// device.
31///
32/// A device allocation can be copied to the host, yielding a new [`ByteBuffer`] containing the
33/// copied data. Copying can fail at runtime, error recovery is system-dependent.
34#[derive(Debug, Clone)]
35pub struct BufferHandle(Inner);
36
37#[derive(Debug, Clone)]
38enum Inner {
39 /// On the host/cpu.
40 Host(ByteBuffer),
41 /// On the device/gpu.
42 Device(Arc<dyn DeviceBuffer>),
43}
44
45/// A buffer that is stored on the GPU.
46pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash {
47 /// Returns a reference as `Any` to enable downcasting.
48 fn as_any(&self) -> &dyn Any;
49
50 /// Returns the length of the buffer in bytes.
51 fn len(&self) -> usize;
52
53 /// Returns the alignment of the buffer.
54 fn alignment(&self) -> Alignment;
55
56 /// Returns true if the buffer is empty.
57 fn is_empty(&self) -> bool {
58 self.len() == 0
59 }
60
61 /// Attempts to copy the device buffer to a host ByteBuffer.
62 ///
63 /// # Errors
64 ///
65 /// This operation may fail, depending on the device implementation and the underlying hardware.
66 fn copy_to_host_sync(&self, alignment: Alignment) -> VortexResult<ByteBuffer>;
67
68 /// Copies the device buffer to a host buffer asynchronously.
69 ///
70 /// Schedules an async copy and returns a future that completes when the copy is finished.
71 ///
72 /// # Arguments
73 ///
74 /// * `alignment` - The memory alignment to use for the host buffer.
75 ///
76 /// # Errors
77 ///
78 /// Returns an error if the async copy operation fails.
79 fn copy_to_host(
80 &self,
81 alignment: Alignment,
82 ) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>>;
83
84 /// Create a new buffer that references a subrange of this buffer at the given
85 /// slice indices.
86 ///
87 /// Note that slice indices are in byte units.
88 fn slice(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;
89
90 /// Return a buffer with the given alignment. Where possible, this will be zero-copy.
91 ///
92 /// # Errors
93 ///
94 /// Returns an error if the buffer cannot be aligned (e.g., allocation or copy failure).
95 fn aligned(self: Arc<Self>, alignment: Alignment) -> VortexResult<Arc<dyn DeviceBuffer>>;
96}
97
98pub trait DeviceBufferExt: DeviceBuffer {
99 /// Slice a range of elements `T` out of the device buffer.
100 fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;
101}
102
103impl<B: DeviceBuffer> DeviceBufferExt for B {
104 fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer> {
105 let start_bytes = range.start * size_of::<T>();
106 let end_bytes = range.end * size_of::<T>();
107 self.slice(start_bytes..end_bytes)
108 }
109}
110
111impl Hash for dyn DeviceBuffer {
112 fn hash<H: Hasher>(&self, state: &mut H) {
113 self.dyn_hash(state);
114 }
115}
116
117impl PartialEq for dyn DeviceBuffer {
118 fn eq(&self, other: &Self) -> bool {
119 self.dyn_eq(other)
120 }
121}
122impl Eq for dyn DeviceBuffer {}
123
124impl BufferHandle {
125 /// Create a new handle to a host [`ByteBuffer`].
126 pub fn new_host(byte_buffer: ByteBuffer) -> Self {
127 BufferHandle(Inner::Host(byte_buffer))
128 }
129
130 /// Create a new handle to a memory allocation that exists on an external device.
131 ///
132 /// Allocations on external devices are not cheaply accessible from the CPU and most be copied
133 /// into new memory when we read them.
134 pub fn new_device(device: Arc<dyn DeviceBuffer>) -> Self {
135 BufferHandle(Inner::Device(device))
136 }
137}
138
139impl BufferHandle {
140 /// Returns `true` if this buffer resides on the device (GPU).
141 pub fn is_on_device(&self) -> bool {
142 matches!(&self.0, Inner::Device(_))
143 }
144
145 /// Returns `true` if this buffer resides on the host (CPU).
146 pub fn is_on_host(&self) -> bool {
147 matches!(&self.0, Inner::Host(_))
148 }
149
150 /// Gets the size of the buffer, in bytes.
151 pub fn len(&self) -> usize {
152 match &self.0 {
153 Inner::Host(bytes) => bytes.len(),
154 Inner::Device(device) => device.len(),
155 }
156 }
157
158 /// Returns the alignment of the buffer.
159 pub fn alignment(&self) -> Alignment {
160 match &self.0 {
161 Inner::Host(bytes) => bytes.alignment(),
162 Inner::Device(device) => device.alignment(),
163 }
164 }
165
166 /// Returns true if the buffer is aligned to the given alignment.
167 pub fn is_aligned_to(&self, alignment: Alignment) -> bool {
168 self.alignment().is_aligned_to(alignment)
169 }
170
171 /// Ensure the buffer satisfies the requested alignment.
172 ///
173 /// Both host and device buffers will be copied if necessary to satisfy the alignment.
174 pub fn ensure_aligned(self, alignment: Alignment) -> VortexResult<Self> {
175 match self.0 {
176 Inner::Host(buffer) => Ok(BufferHandle::new_host(buffer.aligned(alignment))),
177 Inner::Device(device) => Ok(BufferHandle::new_device(device.aligned(alignment)?)),
178 }
179 }
180
181 /// Check if the buffer is empty.
182 pub fn is_empty(&self) -> bool {
183 self.len() == 0
184 }
185
186 /// Creates a new handle to a subrange of memory at the given bind range.
187 ///
188 ///
189 /// # Example
190 ///
191 /// ```
192 /// # use vortex_array::buffer::BufferHandle;
193 /// # use vortex_buffer::buffer;
194 /// let handle1 = BufferHandle::new_host(buffer![1u8,2,3,4]);
195 /// let handle2 = handle1.slice(1..4);
196 /// assert_eq!(handle2.unwrap_host(), buffer![2u8,3,4]);
197 /// ```
198 pub fn slice(&self, range: Range<usize>) -> Self {
199 match &self.0 {
200 Inner::Host(host) => BufferHandle::new_host(host.slice(range)),
201 Inner::Device(device) => BufferHandle::new_device(device.slice(range)),
202 }
203 }
204
205 /// Reinterpret the pointee as a buffer of `T` and slice the provided element range.
206 ///
207 /// # Example
208 ///
209 /// ```
210 /// # use vortex_array::buffer::BufferHandle;
211 /// # use vortex_buffer::{buffer, Buffer};
212 /// let values = buffer![1u32, 2u32, 3u32, 4u32];
213 /// let handle = BufferHandle::new_host(values.into_byte_buffer());
214 /// let sliced = handle.slice_typed::<u32>(1..4);
215 /// let result = Buffer::<u32>::from_byte_buffer(sliced.to_host_sync());
216 /// assert_eq!(result, buffer![2, 3, 4]);
217 /// ```
218 pub fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Self {
219 let start = range.start * size_of::<T>();
220 let end = range.end * size_of::<T>();
221
222 self.slice(start..end)
223 }
224
225 #[allow(clippy::panic)]
226 /// Unwraps the handle as host memory.
227 ///
228 /// # Panics
229 ///
230 /// This will panic if the handle points to device memory.
231 pub fn unwrap_host(self) -> ByteBuffer {
232 match self.0 {
233 Inner::Host(b) => b,
234 Inner::Device(_) => panic!("unwrap_host called for Device allocation"),
235 }
236 }
237
238 #[allow(clippy::panic)]
239 /// Unwraps the handle as device memory.
240 ///
241 /// # Panics
242 ///
243 /// This will panic if the handle points to host memory.
244 pub fn unwrap_device(self) -> Arc<dyn DeviceBuffer> {
245 match self.0 {
246 Inner::Device(b) => b,
247 Inner::Host(_) => panic!("unwrap_device called for Host allocation"),
248 }
249 }
250
251 /// Downcast this handle as a handle to a host-resident buffer, or `None`.
252 pub fn as_host_opt(&self) -> Option<&ByteBuffer> {
253 match &self.0 {
254 Inner::Host(buffer) => Some(buffer),
255 Inner::Device(_) => None,
256 }
257 }
258
259 /// Downcast this handle as a handle to a device buffer, or `None`.
260 pub fn as_device_opt(&self) -> Option<&Arc<dyn DeviceBuffer>> {
261 match &self.0 {
262 Inner::Host(_) => None,
263 Inner::Device(device) => Some(device),
264 }
265 }
266
267 /// A version of [`as_host_opt`][Self::as_host_opt] that panics if the allocation is
268 /// not a host allocation.
269 pub fn as_host(&self) -> &ByteBuffer {
270 self.as_host_opt().vortex_expect("expected host buffer")
271 }
272
273 /// A version of [`as_device_opt`][Self::as_device_opt] that panics if the allocation is
274 /// not a device allocation.
275 pub fn as_device(&self) -> &Arc<dyn DeviceBuffer> {
276 self.as_device_opt().vortex_expect("expected device buffer")
277 }
278
279 /// Returns a host-resident copy of the data in the buffer.
280 ///
281 /// If the data was already host-resident, this is trivial.
282 ///
283 /// If the data was device-resident, data will be copied from the device to a new allocation
284 /// on the host.
285 ///
286 /// # Panics
287 ///
288 /// This function will never panic if the data is already host-resident.
289 ///
290 /// For a device-resident handle, any errors triggered by the copying from device to host will
291 /// result in a panic.
292 ///
293 /// See also: [`try_to_host`][Self::try_to_host].
294 pub fn to_host_sync(&self) -> ByteBuffer {
295 self.try_to_host_sync()
296 .vortex_expect("to_host: copy from device to host failed")
297 }
298
299 /// Returns a host-resident copy of the data behind the handle, consuming the handle.
300 ///
301 /// If the data was already host-resident, this completes trivially.
302 ///
303 /// See also [`to_host`][Self::to_host].
304 ///
305 /// # Panics
306 ///
307 /// See the panic documentation on [`to_host`][Self::to_host].
308 pub fn into_host_sync(self) -> ByteBuffer {
309 self.try_into_host_sync()
310 .vortex_expect("into_host: copy from device to host failed")
311 }
312
313 /// Attempts to load this buffer into a host-resident allocation.
314 ///
315 /// If the allocation is already host-resident, this trivially completes with success.
316 ///
317 /// If it is a device allocation, then this issues an operation that attempts to copy the data
318 /// from the device into a host-resident buffer, and returns a handle to that buffer.
319 pub fn try_to_host_sync(&self) -> VortexResult<ByteBuffer> {
320 match &self.0 {
321 Inner::Host(b) => Ok(b.clone()),
322 Inner::Device(device) => device.copy_to_host_sync(ALIGNMENT_TO_HOST_COPY),
323 }
324 }
325
326 /// Attempts to load this buffer into a host-resident allocation, consuming the handle.
327 ///
328 /// See also [`try_to_host`][Self::try_to_host].
329 pub fn try_into_host_sync(self) -> VortexResult<ByteBuffer> {
330 match self.0 {
331 Inner::Host(b) => Ok(b),
332 Inner::Device(device) => device.copy_to_host_sync(ALIGNMENT_TO_HOST_COPY),
333 }
334 }
335
336 /// Asynchronously copies the buffer to the host.
337 ///
338 /// This is a no-op if the buffer is already on the host.
339 ///
340 /// # Returns
341 ///
342 /// A future that resolves to the host buffer when the copy completes.
343 ///
344 /// # Errors
345 ///
346 /// Returns an error if the async copy operation fails.
347 pub fn try_to_host(&self) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>> {
348 match &self.0 {
349 Inner::Host(b) => {
350 let buffer = b.clone();
351 Ok(Box::pin(async move { Ok(buffer) }))
352 }
353 Inner::Device(device) => device.copy_to_host(ALIGNMENT_TO_HOST_COPY),
354 }
355 }
356
357 /// Asynchronously copies the buffer to the host, consuming the handle.
358 ///
359 /// This is a no-op if the buffer is already on the host.
360 ///
361 /// # Returns
362 ///
363 /// A future that resolves to the host buffer when the copy completes.
364 ///
365 /// # Errors
366 ///
367 /// Returns an error if the async copy operation fails.
368 pub fn try_into_host(self) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>> {
369 match self.0 {
370 Inner::Host(b) => Ok(Box::pin(async move { Ok(b) })),
371 Inner::Device(device) => device.copy_to_host(ALIGNMENT_TO_HOST_COPY),
372 }
373 }
374
375 /// Asynchronously copies the buffer to the host.
376 ///
377 /// # Panics
378 ///
379 /// Any errors triggered by the copying from device to host will result in a panic.
380 pub fn to_host(&self) -> BoxFuture<'static, ByteBuffer> {
381 let future = self
382 .try_to_host()
383 .vortex_expect("to_host: failed to initiate copy from device to host");
384 Box::pin(async move {
385 future
386 .await
387 .vortex_expect("to_host: copy from device to host failed")
388 })
389 }
390
391 /// Asynchronously copies the buffer to the host, consuming the handle.
392 ///
393 /// # Panics
394 ///
395 /// Any errors triggered by the copying from device to host will result in a panic.
396 pub fn into_host(self) -> BoxFuture<'static, ByteBuffer> {
397 let future = self
398 .try_into_host()
399 .vortex_expect("into_host: failed to initiate copy from device to host");
400 Box::pin(async move {
401 future
402 .await
403 .vortex_expect("into_host: copy from device to host failed")
404 })
405 }
406}
407
408impl ArrayHash for BufferHandle {
409 // TODO(aduffy): implement for array hash
410 fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
411 match &self.0 {
412 Inner::Host(host) => host.array_hash(state, precision),
413 Inner::Device(dev) => match precision {
414 Precision::Ptr => {
415 Arc::as_ptr(dev).hash(state);
416 }
417 Precision::Value => {
418 dev.hash(state);
419 }
420 },
421 }
422 }
423}
424
425impl ArrayEq for BufferHandle {
426 fn array_eq(&self, other: &Self, precision: Precision) -> bool {
427 match (&self.0, &other.0) {
428 (Inner::Host(b), Inner::Host(b2)) => b.array_eq(b2, precision),
429 (Inner::Device(b), Inner::Device(b2)) => match precision {
430 Precision::Ptr => Arc::ptr_eq(b, b2),
431 Precision::Value => b.eq(b2),
432 },
433 _ => false,
434 }
435 }
436}