1use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc};
7
8use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
9use lance_core::{utils::bit::is_pwr_two, Error, Result};
10use snafu::location;
11
12#[derive(Clone, Debug, PartialEq, Eq)]
22pub struct LanceBuffer(Buffer);
23
24impl LanceBuffer {
25 pub fn into_buffer(self) -> Buffer {
27 self.0
28 }
29
30 pub fn all_unset(len: usize) -> Self {
32 Self(Buffer::from_vec(vec![0; len]))
33 }
34
35 pub fn all_set(len: usize) -> Self {
37 Self(Buffer::from_vec(vec![0xff; len]))
38 }
39
40 pub fn empty() -> Self {
42 Self(Buffer::from_vec(Vec::<u8>::new()))
43 }
44
45 pub fn as_hex(&self) -> String {
47 hex::encode_upper(self)
48 }
49
50 pub fn concat(buffers: &[Self]) -> Self {
54 let total_len = buffers.iter().map(|b| b.len()).sum();
55 let mut data = Vec::with_capacity(total_len);
56 for buffer in buffers {
57 data.extend_from_slice(buffer.as_ref());
58 }
59 Self(Buffer::from_vec(data))
60 }
61
62 pub fn as_spaced_hex(&self, bytes_per_word: u32) -> String {
65 let hex = self.as_hex();
66 let chars_per_word = bytes_per_word as usize * 2;
67 let num_words = hex.len() / chars_per_word;
68 let mut spaced_hex = String::with_capacity(hex.len() + num_words);
69 for (i, c) in hex.chars().enumerate() {
70 if i % chars_per_word == 0 && i != 0 {
71 spaced_hex.push(' ');
72 }
73 spaced_hex.push(c);
74 }
75 spaced_hex
76 }
77
78 pub fn from_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
92 if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
93 {
94 let mut buf = Vec::with_capacity(bytes.len());
96 buf.extend_from_slice(&bytes);
97 Self(Buffer::from_vec(buf))
98 } else {
99 unsafe {
102 Self(Buffer::from_custom_allocation(
103 NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
104 bytes.len(),
105 Arc::new(bytes),
106 ))
107 }
108 }
109 }
110
111 pub fn into_bytes(self) -> bytes::Bytes {
115 self.0.into_vec::<u8>().unwrap().into()
116 }
117
118 pub fn to_owned(&self) -> Self {
120 Self(Buffer::from_vec(self.0.to_vec()))
121 }
122
123 pub fn deep_copy(&self) -> Self {
125 Self(Buffer::from_vec(self.0.to_vec()))
126 }
127
128 pub fn reinterpret_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
139 Self(Buffer::from_vec(vec))
140 }
141
142 pub fn reinterpret_slice<T: ArrowNativeType + RefUnwindSafe>(arc: Arc<[T]>) -> Self {
148 let slice = arc.as_ref();
149 let data = NonNull::new(slice.as_ptr() as _).unwrap_or(NonNull::dangling());
150 let len = std::mem::size_of_val(slice);
151 let buffer = unsafe { Buffer::from_custom_allocation(data, len, Arc::new(arc)) };
153 Self(buffer)
154 }
155
156 pub fn borrow_to_typed_slice<T: ArrowNativeType>(&self) -> ScalarBuffer<T> {
165 let align = std::mem::align_of::<T>();
166 let is_aligned = self.as_ptr().align_offset(align) == 0;
167 if self.len() % std::mem::size_of::<T>() != 0 {
168 panic!("attempt to borrow_to_typed_slice to data type of size {} but we have {} bytes which isn't evenly divisible", std::mem::size_of::<T>(), self.len());
169 }
170
171 if is_aligned {
172 ScalarBuffer::<T>::from(self.clone().into_buffer())
173 } else {
174 let num_values = self.len() / std::mem::size_of::<T>();
175 let vec = Vec::<T>::with_capacity(num_values);
176 let mut bytes = MutableBuffer::from(vec);
177 bytes.extend_from_slice(self);
178 ScalarBuffer::<T>::from(Buffer::from(bytes))
179 }
180 }
181
182 pub fn concat_into_one(buffers: Vec<Self>) -> Self {
186 if buffers.len() == 1 {
187 return buffers.into_iter().next().unwrap();
188 }
189
190 let mut total_len = 0;
191 for buffer in &buffers {
192 total_len += buffer.len();
193 }
194
195 let mut data = Vec::with_capacity(total_len);
196 for buffer in buffers {
197 data.extend_from_slice(buffer.as_ref());
198 }
199
200 Self(Buffer::from_vec(data))
201 }
202
203 pub fn zip_into_one(buffers: Vec<(Self, u64)>, num_values: u64) -> Result<Self> {
207 let bytes_per_value = buffers.iter().map(|(_, bits_per_value)| {
208 if bits_per_value % 8 == 0 {
209 Ok(bits_per_value / 8)
210 } else {
211 Err(Error::InvalidInput { source: format!("LanceBuffer::zip_into_one only supports full-byte buffers currently and received a buffer with {} bits per value", bits_per_value).into(), location: location!() })
212 }
213 }).collect::<Result<Vec<_>>>()?;
214 let total_bytes_per_value = bytes_per_value.iter().sum::<u64>();
215 let total_bytes = (total_bytes_per_value * num_values) as usize;
216
217 let mut zipped = vec![0_u8; total_bytes];
218 let mut buffer_ptrs = buffers
219 .iter()
220 .zip(bytes_per_value)
221 .map(|((buffer, _), bytes_per_value)| (buffer.as_ptr(), bytes_per_value as usize))
222 .collect::<Vec<_>>();
223
224 let mut zipped_ptr = zipped.as_mut_ptr();
225 unsafe {
226 let end = zipped_ptr.add(total_bytes);
227 while zipped_ptr < end {
228 for (buf, bytes_per_value) in buffer_ptrs.iter_mut() {
229 std::ptr::copy_nonoverlapping(*buf, zipped_ptr, *bytes_per_value);
230 zipped_ptr = zipped_ptr.add(*bytes_per_value);
231 *buf = buf.add(*bytes_per_value);
232 }
233 }
234 }
235
236 Ok(Self(Buffer::from_vec(zipped)))
237 }
238
239 pub fn copy_slice(slice: &[u8]) -> Self {
244 Self(Buffer::from_vec(slice.to_vec()))
245 }
246
247 pub fn copy_array<const N: usize>(array: [u8; N]) -> Self {
252 Self(Buffer::from_vec(Vec::from(array)))
253 }
254
255 #[allow(clippy::len_without_is_empty)]
256 pub fn len(&self) -> usize {
257 self.0.len()
258 }
259
260 pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
266 let original_buffer_len = self.len();
267 assert!(
268 offset.saturating_add(length) <= original_buffer_len,
269 "the offset + length of the sliced Buffer cannot exceed the existing length"
270 );
271 Self(self.0.slice_with_length(offset, length))
272 }
273
274 pub fn bit_slice_le_with_length(&self, offset: usize, length: usize) -> Self {
285 let sliced = self.0.bit_slice(offset, length);
286 Self(sliced)
287 }
288
289 pub fn as_ptr(&self) -> *const u8 {
291 self.0.as_ptr()
292 }
293}
294
295impl AsRef<[u8]> for LanceBuffer {
296 fn as_ref(&self) -> &[u8] {
297 self.0.as_slice()
298 }
299}
300
301impl Deref for LanceBuffer {
302 type Target = [u8];
303
304 fn deref(&self) -> &Self::Target {
305 self.as_ref()
306 }
307}
308
309impl From<Vec<u8>> for LanceBuffer {
312 fn from(buffer: Vec<u8>) -> Self {
313 Self(Buffer::from_vec(buffer))
314 }
315}
316
317impl From<Buffer> for LanceBuffer {
318 fn from(buffer: Buffer) -> Self {
319 Self(buffer)
320 }
321}
322
323pub struct LanceBufferIter {
326 buffer: Buffer,
327 index: usize,
328}
329
330impl Iterator for LanceBufferIter {
331 type Item = u8;
332
333 fn next(&mut self) -> Option<Self::Item> {
334 if self.index >= self.buffer.len() {
335 None
336 } else {
337 let byte = unsafe { self.buffer.get_unchecked(self.index) };
339 self.index += 1;
340 Some(*byte)
341 }
342 }
343}
344
345impl IntoIterator for LanceBuffer {
346 type Item = u8;
347 type IntoIter = LanceBufferIter;
348
349 fn into_iter(self) -> Self::IntoIter {
350 LanceBufferIter {
351 buffer: self.0,
352 index: 0,
353 }
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use arrow_buffer::Buffer;
360
361 use super::LanceBuffer;
362
363 #[test]
364 fn test_eq() {
365 let buf = LanceBuffer::from(Buffer::from_vec(vec![1_u8, 2, 3]));
366 let buf2 = LanceBuffer::from(vec![1, 2, 3]);
367 assert_eq!(buf, buf2);
368 }
369
370 #[test]
371 fn test_reinterpret_vec() {
372 let vec = vec![1_u32, 2, 3];
373 let buf = LanceBuffer::reinterpret_vec(vec);
374
375 let mut expected = Vec::with_capacity(12);
376 expected.extend_from_slice(&1_u32.to_ne_bytes());
377 expected.extend_from_slice(&2_u32.to_ne_bytes());
378 expected.extend_from_slice(&3_u32.to_ne_bytes());
379 let expected = LanceBuffer::from(expected);
380
381 assert_eq!(expected, buf);
382 assert_eq!(buf.borrow_to_typed_slice::<u32>().as_ref(), vec![1, 2, 3]);
383 }
384
385 #[test]
386 fn test_concat() {
387 let buf1 = LanceBuffer::from(vec![1_u8, 2, 3]);
388 let buf2 = LanceBuffer::from(vec![4_u8, 5, 6]);
389 let buf3 = LanceBuffer::from(vec![7_u8, 8, 9]);
390
391 let expected = LanceBuffer::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
392 assert_eq!(
393 expected,
394 LanceBuffer::concat_into_one(vec![buf1, buf2, buf3])
395 );
396
397 let empty = LanceBuffer::empty();
398 assert_eq!(
399 LanceBuffer::empty(),
400 LanceBuffer::concat_into_one(vec![empty])
401 );
402
403 let expected = LanceBuffer::from(vec![1, 2, 3]);
404 assert_eq!(
405 expected,
406 LanceBuffer::concat_into_one(vec![expected.deep_copy(), LanceBuffer::empty()])
407 );
408 }
409
410 #[test]
411 fn test_zip() {
412 let buf1 = LanceBuffer::from(vec![1_u8, 2, 3]);
413 let buf2 = LanceBuffer::reinterpret_vec(vec![1_u16, 2, 3]);
414 let buf3 = LanceBuffer::reinterpret_vec(vec![1_u32, 2, 3]);
415
416 let zipped = LanceBuffer::zip_into_one(vec![(buf1, 8), (buf2, 16), (buf3, 32)], 3).unwrap();
417
418 assert_eq!(zipped.len(), 21);
419
420 let mut expected = Vec::with_capacity(21);
421 for i in 1..4 {
422 expected.push(i as u8);
423 expected.extend_from_slice(&(i as u16).to_ne_bytes());
424 expected.extend_from_slice(&(i as u32).to_ne_bytes());
425 }
426 let expected = LanceBuffer::from(expected);
427
428 assert_eq!(expected, zipped);
429 }
430
431 #[test]
432 fn test_hex() {
433 let buf = LanceBuffer::from(vec![1, 2, 15, 20]);
434 assert_eq!("01020F14", buf.as_hex());
435 }
436
437 #[test]
438 #[should_panic]
439 fn test_to_typed_slice_invalid() {
440 let buf = LanceBuffer::from(vec![0, 1, 2]);
441 buf.borrow_to_typed_slice::<u16>();
442 }
443
444 #[test]
445 fn test_to_typed_slice() {
446 let buf = LanceBuffer::from(vec![0, 1]);
449 let borrow = buf.borrow_to_typed_slice::<u16>();
450 let view_ptr = borrow.as_ref().as_ptr();
451 let borrow2 = buf.borrow_to_typed_slice::<u16>();
452 let view_ptr2 = borrow2.as_ref().as_ptr();
453
454 assert_eq!(view_ptr, view_ptr2);
455
456 let bytes = bytes::Bytes::from(vec![0, 1, 2]);
457 let sliced = bytes.slice(1..3);
458 let buf = LanceBuffer::from_bytes(sliced, 1);
460 let borrow = buf.borrow_to_typed_slice::<u16>();
461 let view_ptr = borrow.as_ref().as_ptr();
462 let borrow2 = buf.borrow_to_typed_slice::<u16>();
463 let view_ptr2 = borrow2.as_ref().as_ptr();
464
465 assert_ne!(view_ptr, view_ptr2);
466 }
467
468 #[test]
469 fn test_bit_slice_le() {
470 let buf = LanceBuffer::from(vec![0x0F, 0x0B]);
471
472 assert_eq!(buf.bit_slice_le_with_length(0, 4).as_ref(), &[0x0F]);
474 assert_eq!(buf.bit_slice_le_with_length(4, 4).as_ref(), &[0x00]);
475 assert_eq!(buf.bit_slice_le_with_length(3, 8).as_ref(), &[0x61]);
476 assert_eq!(buf.bit_slice_le_with_length(0, 8).as_ref(), &[0x0F]);
477 assert_eq!(buf.bit_slice_le_with_length(4, 8).as_ref(), &[0xB0]);
478 assert_eq!(buf.bit_slice_le_with_length(4, 12).as_ref(), &[0xB0, 0x00]);
479 }
480}