lance_encoding/
buffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for byte arrays
5
6use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc};
7
8use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
9use lance_core::{utils::bit::is_pwr_two, Error, Result};
10use snafu::location;
11use std::borrow::Cow;
12
13/// A copy-on-write byte buffer.
14///
15/// It wraps arrow_buffer::Buffer which provides:
16/// - Cheap cloning (reference counted)
17/// - Zero-copy slicing
18/// - Automatic memory alignment
19///
20/// LanceBuffer is designed to be used in situations where you might need to
21/// pass around byte buffers efficiently without worrying about ownership.
22#[derive(Clone, Debug, PartialEq, Eq)]
23pub struct LanceBuffer(Buffer);
24
25impl LanceBuffer {
26    /// Convert into an Arrow buffer. Never copies data.
27    pub fn into_buffer(self) -> Buffer {
28        self.0
29    }
30
31    /// Returns a buffer of the given size with all bits set to 0
32    pub fn all_unset(len: usize) -> Self {
33        Self(Buffer::from_vec(vec![0; len]))
34    }
35
36    /// Returns a buffer of the given size with all bits set to 1
37    pub fn all_set(len: usize) -> Self {
38        Self(Buffer::from_vec(vec![0xff; len]))
39    }
40
41    /// Creates an empty buffer
42    pub fn empty() -> Self {
43        Self(Buffer::from_vec(Vec::<u8>::new()))
44    }
45
46    /// Converts the buffer into a hex string
47    pub fn as_hex(&self) -> String {
48        hex::encode_upper(self)
49    }
50
51    /// Combine multiple buffers into a single buffer
52    ///
53    /// This does involve a data copy (and allocation of a new buffer)
54    pub fn concat(buffers: &[Self]) -> Self {
55        let total_len = buffers.iter().map(|b| b.len()).sum();
56        let mut data = Vec::with_capacity(total_len);
57        for buffer in buffers {
58            data.extend_from_slice(buffer.as_ref());
59        }
60        Self(Buffer::from_vec(data))
61    }
62
63    /// Converts the buffer into a hex string, inserting a space
64    /// between words
65    pub fn as_spaced_hex(&self, bytes_per_word: u32) -> String {
66        let hex = self.as_hex();
67        let chars_per_word = bytes_per_word as usize * 2;
68        let num_words = hex.len() / chars_per_word;
69        let mut spaced_hex = String::with_capacity(hex.len() + num_words);
70        for (i, c) in hex.chars().enumerate() {
71            if i % chars_per_word == 0 && i != 0 {
72                spaced_hex.push(' ');
73            }
74            spaced_hex.push(c);
75        }
76        spaced_hex
77    }
78
79    /// Create a LanceBuffer from a bytes::Bytes object
80    ///
81    /// The alignment must be specified (as `bytes_per_value`) since we want to make
82    /// sure we can safely reinterpret the buffer.
83    ///
84    /// If the buffer is properly aligned this will be zero-copy.  If not, a copy
85    /// will be made.
86    ///
87    /// If `bytes_per_value` is not a power of two, then we assume the buffer is
88    /// never going to be reinterpret into another type and we can safely
89    /// ignore the alignment.
90    ///
91    /// This is a zero-copy operation when the buffer is properly aligned.
92    pub fn from_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
93        if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
94        {
95            // The original buffer is not aligned, cannot zero-copy
96            let mut buf = Vec::with_capacity(bytes.len());
97            buf.extend_from_slice(&bytes);
98            Self(Buffer::from_vec(buf))
99        } else {
100            // The original buffer is aligned, can zero-copy
101            // SAFETY: the alignment is correct we can make this conversion
102            unsafe {
103                Self(Buffer::from_custom_allocation(
104                    NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
105                    bytes.len(),
106                    Arc::new(bytes),
107                ))
108            }
109        }
110    }
111
112    /// Make an owned copy of the buffer (always does a copy of the data)
113    pub fn deep_copy(&self) -> Self {
114        Self(Buffer::from_vec(self.0.to_vec()))
115    }
116
117    /// Reinterprets a Vec<T> as a LanceBuffer
118    ///
119    /// This is a zero-copy operation. We can safely reinterpret Vec<T> into &[u8] which is what happens here.
120    /// However, we cannot safely reinterpret a Vec<T> into a Vec<u8> in rust due to alignment constraints
121    /// from [`Vec::from_raw_parts`]:
122    ///
123    /// > `T` needs to have the same alignment as what `ptr` was allocated with.
124    /// > (`T` having a less strict alignment is not sufficient, the alignment really
125    /// > needs to be equal to satisfy the [`dealloc`] requirement that memory must be
126    /// > allocated and deallocated with the same layout.)
127    pub fn reinterpret_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
128        Self(Buffer::from_vec(vec))
129    }
130
131    /// Reinterprets Arc<[T]> as a LanceBuffer
132    ///
133    /// This is similar to [`Self::reinterpret_vec`] but for Arc<[T]> instead of Vec<T>
134    ///
135    /// The same alignment constraints apply
136    pub fn reinterpret_slice<T: ArrowNativeType + RefUnwindSafe>(arc: Arc<[T]>) -> Self {
137        let slice = arc.as_ref();
138        let data = NonNull::new(slice.as_ptr() as _).unwrap_or(NonNull::dangling());
139        let len = std::mem::size_of_val(slice);
140        // SAFETY: the ptr will be valid for len items if the Arc<[T]> is valid
141        let buffer = unsafe { Buffer::from_custom_allocation(data, len, Arc::new(arc)) };
142        Self(buffer)
143    }
144
145    /// Reinterprets a LanceBuffer into a Vec<T>
146    ///
147    /// If the underlying buffer is not properly aligned, this will involve a copy of the data
148    ///
149    /// Note: doing this sort of re-interpretation generally makes assumptions about the endianness
150    /// of the data.  Lance does not support big-endian machines so this is safe.  However, if we end
151    /// up supporting big-endian machines in the future, then any use of this method will need to be
152    /// carefully reviewed.
153    pub fn borrow_to_typed_slice<T: ArrowNativeType>(&self) -> ScalarBuffer<T> {
154        let align = std::mem::align_of::<T>();
155        let is_aligned = self.as_ptr().align_offset(align) == 0;
156        if self.len() % std::mem::size_of::<T>() != 0 {
157            panic!("attempt to borrow_to_typed_slice to data type of size {} but we have {} bytes which isn't evenly divisible", std::mem::size_of::<T>(), self.len());
158        }
159
160        if is_aligned {
161            ScalarBuffer::<T>::from(self.clone().into_buffer())
162        } else {
163            let num_values = self.len() / std::mem::size_of::<T>();
164            let vec = Vec::<T>::with_capacity(num_values);
165            let mut bytes = MutableBuffer::from(vec);
166            bytes.extend_from_slice(self);
167            ScalarBuffer::<T>::from(Buffer::from(bytes))
168        }
169    }
170
171    /// Reinterprets a LanceBuffer into a &[T]
172    ///
173    /// Unlike [`borrow_to_typed_slice`], this function returns a `Cow<'_, [T]>` instead of an owned
174    /// buffer. It saves the cost of Arc creation and destruction, which can be really helpful when
175    /// we borrow data and just drop it without reusing it.
176    ///
177    /// Caller should decide which way to use based on their own needs.
178    ///
179    /// If the underlying buffer is not properly aligned, this will involve a copy of the data
180    ///
181    /// Note: doing this sort of re-interpretation generally makes assumptions about the endianness
182    /// of the data.  Lance does not support big-endian machines so this is safe.  However, if we end
183    /// up supporting big-endian machines in the future, then any use of this method will need to be
184    /// carefully reviewed.
185    pub fn borrow_to_typed_view<T: ArrowNativeType + bytemuck::Pod>(&self) -> Cow<'_, [T]> {
186        let align = std::mem::align_of::<T>();
187        if self.len() % std::mem::size_of::<T>() != 0 {
188            panic!("attempt to view data type of size {} but we have {} bytes which isn't evenly divisible", std::mem::size_of::<T>(), self.len());
189        }
190
191        if self.as_ptr().align_offset(align) == 0 {
192            Cow::Borrowed(bytemuck::cast_slice(&self.0))
193        } else {
194            Cow::Owned(bytemuck::pod_collect_to_vec(self.0.as_slice()))
195        }
196    }
197
198    /// Concatenates multiple buffers into a single buffer, consuming the input buffers
199    ///
200    /// If there is only one buffer, it will be returned as is
201    pub fn concat_into_one(buffers: Vec<Self>) -> Self {
202        if buffers.len() == 1 {
203            return buffers.into_iter().next().unwrap();
204        }
205
206        let mut total_len = 0;
207        for buffer in &buffers {
208            total_len += buffer.len();
209        }
210
211        let mut data = Vec::with_capacity(total_len);
212        for buffer in buffers {
213            data.extend_from_slice(buffer.as_ref());
214        }
215
216        Self(Buffer::from_vec(data))
217    }
218
219    /// Zips multiple buffers into a single buffer, consuming the input buffers
220    ///
221    /// Unlike concat_into_one this "zips" the buffers, interleaving the values
222    pub fn zip_into_one(buffers: Vec<(Self, u64)>, num_values: u64) -> Result<Self> {
223        let bytes_per_value = buffers.iter().map(|(_, bits_per_value)| {
224            if bits_per_value % 8 == 0 {
225                Ok(bits_per_value / 8)
226            } else {
227                Err(Error::InvalidInput { source: format!("LanceBuffer::zip_into_one only supports full-byte buffers currently and received a buffer with {} bits per value", bits_per_value).into(), location: location!() })
228            }
229        }).collect::<Result<Vec<_>>>()?;
230        let total_bytes_per_value = bytes_per_value.iter().sum::<u64>();
231        let total_bytes = (total_bytes_per_value * num_values) as usize;
232
233        let mut zipped = vec![0_u8; total_bytes];
234        let mut buffer_ptrs = buffers
235            .iter()
236            .zip(bytes_per_value)
237            .map(|((buffer, _), bytes_per_value)| (buffer.as_ptr(), bytes_per_value as usize))
238            .collect::<Vec<_>>();
239
240        let mut zipped_ptr = zipped.as_mut_ptr();
241        unsafe {
242            let end = zipped_ptr.add(total_bytes);
243            while zipped_ptr < end {
244                for (buf, bytes_per_value) in buffer_ptrs.iter_mut() {
245                    std::ptr::copy_nonoverlapping(*buf, zipped_ptr, *bytes_per_value);
246                    zipped_ptr = zipped_ptr.add(*bytes_per_value);
247                    *buf = buf.add(*bytes_per_value);
248                }
249            }
250        }
251
252        Ok(Self(Buffer::from_vec(zipped)))
253    }
254
255    /// Create a LanceBuffer from a slice
256    ///
257    /// This is NOT a zero-copy operation.  We can't create a borrowed buffer because
258    /// we have no way of extending the lifetime of the slice.
259    pub fn copy_slice(slice: &[u8]) -> Self {
260        Self(Buffer::from_vec(slice.to_vec()))
261    }
262
263    /// Create a LanceBuffer from an array (fixed-size slice)
264    ///
265    /// This is NOT a zero-copy operation.  The slice memory could be on the stack and
266    /// thus we can't forget it.
267    pub fn copy_array<const N: usize>(array: [u8; N]) -> Self {
268        Self(Buffer::from_vec(Vec::from(array)))
269    }
270
271    #[allow(clippy::len_without_is_empty)]
272    pub fn len(&self) -> usize {
273        self.0.len()
274    }
275
276    /// Returns a new [LanceBuffer] that is a slice of this buffer starting at `offset`,
277    /// with `length` bytes.
278    /// Doing so allows the same memory region to be shared between lance buffers.
279    /// # Panics
280    /// Panics if `(offset + length)` is larger than the existing length.
281    pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
282        let original_buffer_len = self.len();
283        assert!(
284            offset.saturating_add(length) <= original_buffer_len,
285            "the offset + length of the sliced Buffer cannot exceed the existing length"
286        );
287        Self(self.0.slice_with_length(offset, length))
288    }
289
290    /// Returns a new [LanceBuffer] that is a slice of this buffer starting at bit `offset`
291    /// with `length` bits.
292    ///
293    /// Unlike `slice_with_length`, this method allows for slicing at a bit level but always
294    /// requires a copy of the data (unless offset is byte-aligned)
295    ///
296    /// This method performs the bit slice using the Arrow convention of *bitwise* little-endian
297    ///
298    /// This means, given the bit buffer 0bABCDEFGH_HIJKLMNOP and the slice starting at bit 3 and
299    /// with length 8, the result will be 0bNOPABCDE
300    pub fn bit_slice_le_with_length(&self, offset: usize, length: usize) -> Self {
301        let sliced = self.0.bit_slice(offset, length);
302        Self(sliced)
303    }
304
305    /// Get a pointer to the underlying data
306    pub fn as_ptr(&self) -> *const u8 {
307        self.0.as_ptr()
308    }
309}
310
311impl AsRef<[u8]> for LanceBuffer {
312    fn as_ref(&self) -> &[u8] {
313        self.0.as_slice()
314    }
315}
316
317impl Deref for LanceBuffer {
318    type Target = [u8];
319
320    fn deref(&self) -> &Self::Target {
321        self.as_ref()
322    }
323}
324
325// All `From` implementations are zero-copy
326
327impl From<Vec<u8>> for LanceBuffer {
328    fn from(buffer: Vec<u8>) -> Self {
329        Self(Buffer::from_vec(buffer))
330    }
331}
332
333impl From<Buffer> for LanceBuffer {
334    fn from(buffer: Buffer) -> Self {
335        Self(buffer)
336    }
337}
338
339// An iterator that keeps a clone of a borrowed LanceBuffer so we
340// can have a 'static lifetime
341pub struct LanceBufferIter {
342    buffer: Buffer,
343    index: usize,
344}
345
346impl Iterator for LanceBufferIter {
347    type Item = u8;
348
349    fn next(&mut self) -> Option<Self::Item> {
350        if self.index >= self.buffer.len() {
351            None
352        } else {
353            // SAFETY: we just checked that index is in bounds
354            let byte = unsafe { self.buffer.get_unchecked(self.index) };
355            self.index += 1;
356            Some(*byte)
357        }
358    }
359}
360
361impl IntoIterator for LanceBuffer {
362    type Item = u8;
363    type IntoIter = LanceBufferIter;
364
365    fn into_iter(self) -> Self::IntoIter {
366        LanceBufferIter {
367            buffer: self.0,
368            index: 0,
369        }
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use arrow_buffer::Buffer;
376
377    use super::LanceBuffer;
378
379    #[test]
380    fn test_eq() {
381        let buf = LanceBuffer::from(Buffer::from_vec(vec![1_u8, 2, 3]));
382        let buf2 = LanceBuffer::from(vec![1, 2, 3]);
383        assert_eq!(buf, buf2);
384    }
385
386    #[test]
387    fn test_reinterpret_vec() {
388        let vec = vec![1_u32, 2, 3];
389        let buf = LanceBuffer::reinterpret_vec(vec);
390
391        let mut expected = Vec::with_capacity(12);
392        expected.extend_from_slice(&1_u32.to_ne_bytes());
393        expected.extend_from_slice(&2_u32.to_ne_bytes());
394        expected.extend_from_slice(&3_u32.to_ne_bytes());
395        let expected = LanceBuffer::from(expected);
396
397        assert_eq!(expected, buf);
398        assert_eq!(buf.borrow_to_typed_slice::<u32>().as_ref(), vec![1, 2, 3]);
399    }
400
401    #[test]
402    fn test_concat() {
403        let buf1 = LanceBuffer::from(vec![1_u8, 2, 3]);
404        let buf2 = LanceBuffer::from(vec![4_u8, 5, 6]);
405        let buf3 = LanceBuffer::from(vec![7_u8, 8, 9]);
406
407        let expected = LanceBuffer::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
408        assert_eq!(
409            expected,
410            LanceBuffer::concat_into_one(vec![buf1, buf2, buf3])
411        );
412
413        let empty = LanceBuffer::empty();
414        assert_eq!(
415            LanceBuffer::empty(),
416            LanceBuffer::concat_into_one(vec![empty])
417        );
418
419        let expected = LanceBuffer::from(vec![1, 2, 3]);
420        assert_eq!(
421            expected,
422            LanceBuffer::concat_into_one(vec![expected.deep_copy(), LanceBuffer::empty()])
423        );
424    }
425
426    #[test]
427    fn test_zip() {
428        let buf1 = LanceBuffer::from(vec![1_u8, 2, 3]);
429        let buf2 = LanceBuffer::reinterpret_vec(vec![1_u16, 2, 3]);
430        let buf3 = LanceBuffer::reinterpret_vec(vec![1_u32, 2, 3]);
431
432        let zipped = LanceBuffer::zip_into_one(vec![(buf1, 8), (buf2, 16), (buf3, 32)], 3).unwrap();
433
434        assert_eq!(zipped.len(), 21);
435
436        let mut expected = Vec::with_capacity(21);
437        for i in 1..4 {
438            expected.push(i as u8);
439            expected.extend_from_slice(&(i as u16).to_ne_bytes());
440            expected.extend_from_slice(&(i as u32).to_ne_bytes());
441        }
442        let expected = LanceBuffer::from(expected);
443
444        assert_eq!(expected, zipped);
445    }
446
447    #[test]
448    fn test_hex() {
449        let buf = LanceBuffer::from(vec![1, 2, 15, 20]);
450        assert_eq!("01020F14", buf.as_hex());
451    }
452
453    #[test]
454    #[should_panic]
455    fn test_to_typed_slice_invalid() {
456        let buf = LanceBuffer::from(vec![0, 1, 2]);
457        buf.borrow_to_typed_slice::<u16>();
458    }
459
460    #[test]
461    fn test_to_typed_slice() {
462        // Buffer is aligned, no copy will be made, both calls
463        // should get same ptr
464        let buf = LanceBuffer::from(vec![0, 1]);
465        let borrow = buf.borrow_to_typed_slice::<u16>();
466        let view_ptr = borrow.as_ref().as_ptr();
467        let borrow2 = buf.borrow_to_typed_slice::<u16>();
468        let view_ptr2 = borrow2.as_ref().as_ptr();
469
470        assert_eq!(view_ptr, view_ptr2);
471
472        let bytes = bytes::Bytes::from(vec![0, 1, 2]);
473        let sliced = bytes.slice(1..3);
474        // Intentionally LYING about alignment here to trigger test
475        let buf = LanceBuffer::from_bytes(sliced, 1);
476        let borrow = buf.borrow_to_typed_slice::<u16>();
477        let view_ptr = borrow.as_ref().as_ptr();
478        let borrow2 = buf.borrow_to_typed_slice::<u16>();
479        let view_ptr2 = borrow2.as_ref().as_ptr();
480
481        assert_ne!(view_ptr, view_ptr2);
482    }
483
484    #[test]
485    fn test_bit_slice_le() {
486        let buf = LanceBuffer::from(vec![0x0F, 0x0B]);
487
488        // Keep in mind that validity buffers are *bitwise* little-endian
489        assert_eq!(buf.bit_slice_le_with_length(0, 4).as_ref(), &[0x0F]);
490        assert_eq!(buf.bit_slice_le_with_length(4, 4).as_ref(), &[0x00]);
491        assert_eq!(buf.bit_slice_le_with_length(3, 8).as_ref(), &[0x61]);
492        assert_eq!(buf.bit_slice_le_with_length(0, 8).as_ref(), &[0x0F]);
493        assert_eq!(buf.bit_slice_le_with_length(4, 8).as_ref(), &[0xB0]);
494        assert_eq!(buf.bit_slice_le_with_length(4, 12).as_ref(), &[0xB0, 0x00]);
495    }
496}