lance_encoding/
buffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for byte arrays
5
6use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc};
7
8use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
9use lance_core::{utils::bit::is_pwr_two, Error, Result};
10use snafu::location;
11
12/// A copy-on-write byte buffer.
13///
14/// It wraps arrow_buffer::Buffer which provides:
15/// - Cheap cloning (reference counted)
16/// - Zero-copy slicing
17/// - Automatic memory alignment
18///
19/// LanceBuffer is designed to be used in situations where you might need to
20/// pass around byte buffers efficiently without worrying about ownership.
21#[derive(Clone, Debug, PartialEq, Eq)]
22pub struct LanceBuffer(Buffer);
23
24impl LanceBuffer {
25    /// Convert into an Arrow buffer. Never copies data.
26    pub fn into_buffer(self) -> Buffer {
27        self.0
28    }
29
30    /// Returns a buffer of the given size with all bits set to 0
31    pub fn all_unset(len: usize) -> Self {
32        Self(Buffer::from_vec(vec![0; len]))
33    }
34
35    /// Returns a buffer of the given size with all bits set to 1
36    pub fn all_set(len: usize) -> Self {
37        Self(Buffer::from_vec(vec![0xff; len]))
38    }
39
40    /// Creates an empty buffer
41    pub fn empty() -> Self {
42        Self(Buffer::from_vec(Vec::<u8>::new()))
43    }
44
45    /// Converts the buffer into a hex string
46    pub fn as_hex(&self) -> String {
47        hex::encode_upper(self)
48    }
49
50    /// Combine multiple buffers into a single buffer
51    ///
52    /// This does involve a data copy (and allocation of a new buffer)
53    pub fn concat(buffers: &[Self]) -> Self {
54        let total_len = buffers.iter().map(|b| b.len()).sum();
55        let mut data = Vec::with_capacity(total_len);
56        for buffer in buffers {
57            data.extend_from_slice(buffer.as_ref());
58        }
59        Self(Buffer::from_vec(data))
60    }
61
62    /// Converts the buffer into a hex string, inserting a space
63    /// between words
64    pub fn as_spaced_hex(&self, bytes_per_word: u32) -> String {
65        let hex = self.as_hex();
66        let chars_per_word = bytes_per_word as usize * 2;
67        let num_words = hex.len() / chars_per_word;
68        let mut spaced_hex = String::with_capacity(hex.len() + num_words);
69        for (i, c) in hex.chars().enumerate() {
70            if i % chars_per_word == 0 && i != 0 {
71                spaced_hex.push(' ');
72            }
73            spaced_hex.push(c);
74        }
75        spaced_hex
76    }
77
78    /// Create a LanceBuffer from a bytes::Bytes object
79    ///
80    /// The alignment must be specified (as `bytes_per_value`) since we want to make
81    /// sure we can safely reinterpret the buffer.
82    ///
83    /// If the buffer is properly aligned this will be zero-copy.  If not, a copy
84    /// will be made.
85    ///
86    /// If `bytes_per_value` is not a power of two, then we assume the buffer is
87    /// never going to be reinterpret into another type and we can safely
88    /// ignore the alignment.
89    ///
90    /// This is a zero-copy operation when the buffer is properly aligned.
91    pub fn from_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
92        if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
93        {
94            // The original buffer is not aligned, cannot zero-copy
95            let mut buf = Vec::with_capacity(bytes.len());
96            buf.extend_from_slice(&bytes);
97            Self(Buffer::from_vec(buf))
98        } else {
99            // The original buffer is aligned, can zero-copy
100            // SAFETY: the alignment is correct we can make this conversion
101            unsafe {
102                Self(Buffer::from_custom_allocation(
103                    NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
104                    bytes.len(),
105                    Arc::new(bytes),
106                ))
107            }
108        }
109    }
110
111    /// Convert a buffer into a bytes::Bytes object
112    ///
113    /// This convert is zero cost.
114    pub fn into_bytes(self) -> bytes::Bytes {
115        self.0.into_vec::<u8>().unwrap().into()
116    }
117
118    /// Creates an owned copy of the buffer, will always involve a full copy of the bytes
119    pub fn to_owned(&self) -> Self {
120        Self(Buffer::from_vec(self.0.to_vec()))
121    }
122
123    /// Make an owned copy of the buffer (always does a copy of the data)
124    pub fn deep_copy(&self) -> Self {
125        Self(Buffer::from_vec(self.0.to_vec()))
126    }
127
128    /// Reinterprets a Vec<T> as a LanceBuffer
129    ///
130    /// This is a zero-copy operation. We can safely reinterpret Vec<T> into &[u8] which is what happens here.
131    /// However, we cannot safely reinterpret a Vec<T> into a Vec<u8> in rust due to alignment constraints
132    /// from [`Vec::from_raw_parts`]:
133    ///
134    /// > `T` needs to have the same alignment as what `ptr` was allocated with.
135    /// > (`T` having a less strict alignment is not sufficient, the alignment really
136    /// > needs to be equal to satisfy the [`dealloc`] requirement that memory must be
137    /// > allocated and deallocated with the same layout.)
138    pub fn reinterpret_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
139        Self(Buffer::from_vec(vec))
140    }
141
142    /// Reinterprets Arc<[T]> as a LanceBuffer
143    ///
144    /// This is similar to [`Self::reinterpret_vec`] but for Arc<[T]> instead of Vec<T>
145    ///
146    /// The same alignment constraints apply
147    pub fn reinterpret_slice<T: ArrowNativeType + RefUnwindSafe>(arc: Arc<[T]>) -> Self {
148        let slice = arc.as_ref();
149        let data = NonNull::new(slice.as_ptr() as _).unwrap_or(NonNull::dangling());
150        let len = std::mem::size_of_val(slice);
151        // SAFETY: the ptr will be valid for len items if the Arc<[T]> is valid
152        let buffer = unsafe { Buffer::from_custom_allocation(data, len, Arc::new(arc)) };
153        Self(buffer)
154    }
155
156    /// Reinterprets a LanceBuffer into a Vec<T>
157    ///
158    /// If the underlying buffer is not properly aligned, this will involve a copy of the data
159    ///
160    /// Note: doing this sort of re-interpretation generally makes assumptions about the endianness
161    /// of the data.  Lance does not support big-endian machines so this is safe.  However, if we end
162    /// up supporting big-endian machines in the future, then any use of this method will need to be
163    /// carefully reviewed.
164    pub fn borrow_to_typed_slice<T: ArrowNativeType>(&self) -> ScalarBuffer<T> {
165        let align = std::mem::align_of::<T>();
166        let is_aligned = self.as_ptr().align_offset(align) == 0;
167        if self.len() % std::mem::size_of::<T>() != 0 {
168            panic!("attempt to borrow_to_typed_slice to data type of size {} but we have {} bytes which isn't evenly divisible", std::mem::size_of::<T>(), self.len());
169        }
170
171        if is_aligned {
172            ScalarBuffer::<T>::from(self.clone().into_buffer())
173        } else {
174            let num_values = self.len() / std::mem::size_of::<T>();
175            let vec = Vec::<T>::with_capacity(num_values);
176            let mut bytes = MutableBuffer::from(vec);
177            bytes.extend_from_slice(self);
178            ScalarBuffer::<T>::from(Buffer::from(bytes))
179        }
180    }
181
182    /// Concatenates multiple buffers into a single buffer, consuming the input buffers
183    ///
184    /// If there is only one buffer, it will be returned as is
185    pub fn concat_into_one(buffers: Vec<Self>) -> Self {
186        if buffers.len() == 1 {
187            return buffers.into_iter().next().unwrap();
188        }
189
190        let mut total_len = 0;
191        for buffer in &buffers {
192            total_len += buffer.len();
193        }
194
195        let mut data = Vec::with_capacity(total_len);
196        for buffer in buffers {
197            data.extend_from_slice(buffer.as_ref());
198        }
199
200        Self(Buffer::from_vec(data))
201    }
202
203    /// Zips multiple buffers into a single buffer, consuming the input buffers
204    ///
205    /// Unlike concat_into_one this "zips" the buffers, interleaving the values
206    pub fn zip_into_one(buffers: Vec<(Self, u64)>, num_values: u64) -> Result<Self> {
207        let bytes_per_value = buffers.iter().map(|(_, bits_per_value)| {
208            if bits_per_value % 8 == 0 {
209                Ok(bits_per_value / 8)
210            } else {
211                Err(Error::InvalidInput { source: format!("LanceBuffer::zip_into_one only supports full-byte buffers currently and received a buffer with {} bits per value", bits_per_value).into(), location: location!() })
212            }
213        }).collect::<Result<Vec<_>>>()?;
214        let total_bytes_per_value = bytes_per_value.iter().sum::<u64>();
215        let total_bytes = (total_bytes_per_value * num_values) as usize;
216
217        let mut zipped = vec![0_u8; total_bytes];
218        let mut buffer_ptrs = buffers
219            .iter()
220            .zip(bytes_per_value)
221            .map(|((buffer, _), bytes_per_value)| (buffer.as_ptr(), bytes_per_value as usize))
222            .collect::<Vec<_>>();
223
224        let mut zipped_ptr = zipped.as_mut_ptr();
225        unsafe {
226            let end = zipped_ptr.add(total_bytes);
227            while zipped_ptr < end {
228                for (buf, bytes_per_value) in buffer_ptrs.iter_mut() {
229                    std::ptr::copy_nonoverlapping(*buf, zipped_ptr, *bytes_per_value);
230                    zipped_ptr = zipped_ptr.add(*bytes_per_value);
231                    *buf = buf.add(*bytes_per_value);
232                }
233            }
234        }
235
236        Ok(Self(Buffer::from_vec(zipped)))
237    }
238
239    /// Create a LanceBuffer from a slice
240    ///
241    /// This is NOT a zero-copy operation.  We can't create a borrowed buffer because
242    /// we have no way of extending the lifetime of the slice.
243    pub fn copy_slice(slice: &[u8]) -> Self {
244        Self(Buffer::from_vec(slice.to_vec()))
245    }
246
247    /// Create a LanceBuffer from an array (fixed-size slice)
248    ///
249    /// This is NOT a zero-copy operation.  The slice memory could be on the stack and
250    /// thus we can't forget it.
251    pub fn copy_array<const N: usize>(array: [u8; N]) -> Self {
252        Self(Buffer::from_vec(Vec::from(array)))
253    }
254
255    #[allow(clippy::len_without_is_empty)]
256    pub fn len(&self) -> usize {
257        self.0.len()
258    }
259
260    /// Returns a new [LanceBuffer] that is a slice of this buffer starting at `offset`,
261    /// with `length` bytes.
262    /// Doing so allows the same memory region to be shared between lance buffers.
263    /// # Panics
264    /// Panics if `(offset + length)` is larger than the existing length.
265    pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
266        let original_buffer_len = self.len();
267        assert!(
268            offset.saturating_add(length) <= original_buffer_len,
269            "the offset + length of the sliced Buffer cannot exceed the existing length"
270        );
271        Self(self.0.slice_with_length(offset, length))
272    }
273
274    /// Returns a new [LanceBuffer] that is a slice of this buffer starting at bit `offset`
275    /// with `length` bits.
276    ///
277    /// Unlike `slice_with_length`, this method allows for slicing at a bit level but always
278    /// requires a copy of the data (unless offset is byte-aligned)
279    ///
280    /// This method performs the bit slice using the Arrow convention of *bitwise* little-endian
281    ///
282    /// This means, given the bit buffer 0bABCDEFGH_HIJKLMNOP and the slice starting at bit 3 and
283    /// with length 8, the result will be 0bNOPABCDE
284    pub fn bit_slice_le_with_length(&self, offset: usize, length: usize) -> Self {
285        let sliced = self.0.bit_slice(offset, length);
286        Self(sliced)
287    }
288
289    /// Get a pointer to the underlying data
290    pub fn as_ptr(&self) -> *const u8 {
291        self.0.as_ptr()
292    }
293}
294
295impl AsRef<[u8]> for LanceBuffer {
296    fn as_ref(&self) -> &[u8] {
297        self.0.as_slice()
298    }
299}
300
301impl Deref for LanceBuffer {
302    type Target = [u8];
303
304    fn deref(&self) -> &Self::Target {
305        self.as_ref()
306    }
307}
308
309// All `From` implementations are zero-copy
310
311impl From<Vec<u8>> for LanceBuffer {
312    fn from(buffer: Vec<u8>) -> Self {
313        Self(Buffer::from_vec(buffer))
314    }
315}
316
317impl From<Buffer> for LanceBuffer {
318    fn from(buffer: Buffer) -> Self {
319        Self(buffer)
320    }
321}
322
323// An iterator that keeps a clone of a borrowed LanceBuffer so we
324// can have a 'static lifetime
325pub struct LanceBufferIter {
326    buffer: Buffer,
327    index: usize,
328}
329
330impl Iterator for LanceBufferIter {
331    type Item = u8;
332
333    fn next(&mut self) -> Option<Self::Item> {
334        if self.index >= self.buffer.len() {
335            None
336        } else {
337            // SAFETY: we just checked that index is in bounds
338            let byte = unsafe { self.buffer.get_unchecked(self.index) };
339            self.index += 1;
340            Some(*byte)
341        }
342    }
343}
344
345impl IntoIterator for LanceBuffer {
346    type Item = u8;
347    type IntoIter = LanceBufferIter;
348
349    fn into_iter(self) -> Self::IntoIter {
350        LanceBufferIter {
351            buffer: self.0,
352            index: 0,
353        }
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use arrow_buffer::Buffer;
360
361    use super::LanceBuffer;
362
363    #[test]
364    fn test_eq() {
365        let buf = LanceBuffer::from(Buffer::from_vec(vec![1_u8, 2, 3]));
366        let buf2 = LanceBuffer::from(vec![1, 2, 3]);
367        assert_eq!(buf, buf2);
368    }
369
370    #[test]
371    fn test_reinterpret_vec() {
372        let vec = vec![1_u32, 2, 3];
373        let buf = LanceBuffer::reinterpret_vec(vec);
374
375        let mut expected = Vec::with_capacity(12);
376        expected.extend_from_slice(&1_u32.to_ne_bytes());
377        expected.extend_from_slice(&2_u32.to_ne_bytes());
378        expected.extend_from_slice(&3_u32.to_ne_bytes());
379        let expected = LanceBuffer::from(expected);
380
381        assert_eq!(expected, buf);
382        assert_eq!(buf.borrow_to_typed_slice::<u32>().as_ref(), vec![1, 2, 3]);
383    }
384
385    #[test]
386    fn test_concat() {
387        let buf1 = LanceBuffer::from(vec![1_u8, 2, 3]);
388        let buf2 = LanceBuffer::from(vec![4_u8, 5, 6]);
389        let buf3 = LanceBuffer::from(vec![7_u8, 8, 9]);
390
391        let expected = LanceBuffer::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
392        assert_eq!(
393            expected,
394            LanceBuffer::concat_into_one(vec![buf1, buf2, buf3])
395        );
396
397        let empty = LanceBuffer::empty();
398        assert_eq!(
399            LanceBuffer::empty(),
400            LanceBuffer::concat_into_one(vec![empty])
401        );
402
403        let expected = LanceBuffer::from(vec![1, 2, 3]);
404        assert_eq!(
405            expected,
406            LanceBuffer::concat_into_one(vec![expected.deep_copy(), LanceBuffer::empty()])
407        );
408    }
409
410    #[test]
411    fn test_zip() {
412        let buf1 = LanceBuffer::from(vec![1_u8, 2, 3]);
413        let buf2 = LanceBuffer::reinterpret_vec(vec![1_u16, 2, 3]);
414        let buf3 = LanceBuffer::reinterpret_vec(vec![1_u32, 2, 3]);
415
416        let zipped = LanceBuffer::zip_into_one(vec![(buf1, 8), (buf2, 16), (buf3, 32)], 3).unwrap();
417
418        assert_eq!(zipped.len(), 21);
419
420        let mut expected = Vec::with_capacity(21);
421        for i in 1..4 {
422            expected.push(i as u8);
423            expected.extend_from_slice(&(i as u16).to_ne_bytes());
424            expected.extend_from_slice(&(i as u32).to_ne_bytes());
425        }
426        let expected = LanceBuffer::from(expected);
427
428        assert_eq!(expected, zipped);
429    }
430
431    #[test]
432    fn test_hex() {
433        let buf = LanceBuffer::from(vec![1, 2, 15, 20]);
434        assert_eq!("01020F14", buf.as_hex());
435    }
436
437    #[test]
438    #[should_panic]
439    fn test_to_typed_slice_invalid() {
440        let buf = LanceBuffer::from(vec![0, 1, 2]);
441        buf.borrow_to_typed_slice::<u16>();
442    }
443
444    #[test]
445    fn test_to_typed_slice() {
446        // Buffer is aligned, no copy will be made, both calls
447        // should get same ptr
448        let buf = LanceBuffer::from(vec![0, 1]);
449        let borrow = buf.borrow_to_typed_slice::<u16>();
450        let view_ptr = borrow.as_ref().as_ptr();
451        let borrow2 = buf.borrow_to_typed_slice::<u16>();
452        let view_ptr2 = borrow2.as_ref().as_ptr();
453
454        assert_eq!(view_ptr, view_ptr2);
455
456        let bytes = bytes::Bytes::from(vec![0, 1, 2]);
457        let sliced = bytes.slice(1..3);
458        // Intentionally LYING about alignment here to trigger test
459        let buf = LanceBuffer::from_bytes(sliced, 1);
460        let borrow = buf.borrow_to_typed_slice::<u16>();
461        let view_ptr = borrow.as_ref().as_ptr();
462        let borrow2 = buf.borrow_to_typed_slice::<u16>();
463        let view_ptr2 = borrow2.as_ref().as_ptr();
464
465        assert_ne!(view_ptr, view_ptr2);
466    }
467
468    #[test]
469    fn test_bit_slice_le() {
470        let buf = LanceBuffer::from(vec![0x0F, 0x0B]);
471
472        // Keep in mind that validity buffers are *bitwise* little-endian
473        assert_eq!(buf.bit_slice_le_with_length(0, 4).as_ref(), &[0x0F]);
474        assert_eq!(buf.bit_slice_le_with_length(4, 4).as_ref(), &[0x00]);
475        assert_eq!(buf.bit_slice_le_with_length(3, 8).as_ref(), &[0x61]);
476        assert_eq!(buf.bit_slice_le_with_length(0, 8).as_ref(), &[0x0F]);
477        assert_eq!(buf.bit_slice_le_with_length(4, 8).as_ref(), &[0xB0]);
478        assert_eq!(buf.bit_slice_le_with_length(4, 12).as_ref(), &[0xB0, 0x00]);
479    }
480}