1use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc};
7
8use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
9use itertools::Either;
10use snafu::location;
11
12use lance_core::{utils::bit::is_pwr_two, Error, Result};
13
14pub enum LanceBuffer {
27 Borrowed(Buffer),
28 Owned(Vec<u8>),
29}
30
31impl PartialEq for LanceBuffer {
33 fn eq(&self, other: &Self) -> bool {
34 match (self, other) {
35 (Self::Borrowed(l0), Self::Borrowed(r0)) => l0 == r0,
36 (Self::Owned(l0), Self::Owned(r0)) => l0 == r0,
37 (Self::Borrowed(l0), Self::Owned(r0)) => l0.as_slice() == r0.as_slice(),
38 (Self::Owned(l0), Self::Borrowed(r0)) => l0.as_slice() == r0.as_slice(),
39 }
40 }
41}
42
43impl Eq for LanceBuffer {}
44
45impl std::fmt::Debug for LanceBuffer {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 let preview = if self.len() > 10 {
48 format!("0x{}...", hex::encode_upper(&self[..10]))
49 } else {
50 format!("0x{}", hex::encode_upper(self.as_ref()))
51 };
52 match self {
53 Self::Borrowed(buffer) => write!(
54 f,
55 "LanceBuffer::Borrowed(bytes={} #bytes={})",
56 preview,
57 buffer.len()
58 ),
59 Self::Owned(buffer) => {
60 write!(
61 f,
62 "LanceBuffer::Owned(bytes={} #bytes={})",
63 preview,
64 buffer.len()
65 )
66 }
67 }
68 }
69}
70
71impl LanceBuffer {
72 pub fn into_owned(self) -> Vec<u8> {
74 match self {
75 Self::Borrowed(buffer) => buffer.to_vec(),
76 Self::Owned(buffer) => buffer,
77 }
78 }
79
80 pub fn into_buffer(self) -> Buffer {
82 match self {
83 Self::Borrowed(buffer) => buffer,
84 Self::Owned(buffer) => Buffer::from_vec(buffer),
85 }
86 }
87
88 pub fn all_unset(len: usize) -> Self {
90 Self::Owned(vec![0; len])
91 }
92
93 pub fn all_set(len: usize) -> Self {
95 Self::Owned(vec![0xff; len])
96 }
97
98 pub fn empty() -> Self {
100 Self::Owned(Vec::new())
101 }
102
103 pub fn as_hex(&self) -> String {
105 hex::encode_upper(self)
106 }
107
108 pub fn concat(buffers: &[Self]) -> Self {
112 let total_len = buffers.iter().map(|b| b.len()).sum();
113 let mut data = Vec::with_capacity(total_len);
114 for buffer in buffers {
115 data.extend_from_slice(buffer.as_ref());
116 }
117 Self::Owned(data)
118 }
119
120 pub fn as_spaced_hex(&self, bytes_per_word: u32) -> String {
123 let hex = self.as_hex();
124 let chars_per_word = bytes_per_word as usize * 2;
125 let num_words = hex.len() / chars_per_word;
126 let mut spaced_hex = String::with_capacity(hex.len() + num_words);
127 for (i, c) in hex.chars().enumerate() {
128 if i % chars_per_word == 0 && i != 0 {
129 spaced_hex.push(' ');
130 }
131 spaced_hex.push(c);
132 }
133 spaced_hex
134 }
135
136 pub fn from_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
148 if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
149 {
150 let mut buf = Vec::with_capacity(bytes.len());
152 buf.extend_from_slice(&bytes);
153 Self::Owned(buf)
154 } else {
155 unsafe {
158 Self::Borrowed(Buffer::from_custom_allocation(
159 NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
160 bytes.len(),
161 Arc::new(bytes),
162 ))
163 }
164 }
165 }
166
167 pub fn into_bytes(self) -> bytes::Bytes {
169 match self {
170 Self::Owned(buf) => buf.into(),
171 Self::Borrowed(buf) => buf.into_vec::<u8>().unwrap().into(),
172 }
173 }
174
175 pub fn into_borrowed(self) -> Self {
179 match self {
180 Self::Borrowed(_) => self,
181 Self::Owned(buffer) => Self::Borrowed(Buffer::from_vec(buffer)),
182 }
183 }
184
185 pub fn to_owned(&self) -> Self {
187 match self {
188 Self::Borrowed(buffer) => Self::Owned(buffer.to_vec()),
189 Self::Owned(buffer) => Self::Owned(buffer.clone()),
190 }
191 }
192
193 pub fn borrow_and_clone(&mut self) -> Self {
197 match self {
198 Self::Borrowed(buffer) => Self::Borrowed(buffer.clone()),
199 Self::Owned(buffer) => {
200 let buf_data = std::mem::take(buffer);
201 let buffer = Buffer::from_vec(buf_data);
202 *self = Self::Borrowed(buffer.clone());
203 Self::Borrowed(buffer)
204 }
205 }
206 }
207
208 pub fn try_clone(&self) -> Result<Self> {
210 match self {
211 Self::Borrowed(buffer) => Ok(Self::Borrowed(buffer.clone())),
212 Self::Owned(_) => Err(Error::Internal {
213 message: "try_clone called on an owned buffer".to_string(),
214 location: location!(),
215 }),
216 }
217 }
218
219 pub fn deep_copy(&self) -> Self {
221 match self {
222 Self::Borrowed(buffer) => Self::Owned(buffer.to_vec()),
223 Self::Owned(buffer) => Self::Owned(buffer.clone()),
224 }
225 }
226
227 pub fn reinterpret_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
240 Self::Borrowed(Buffer::from_vec(vec))
241 }
242
243 pub fn reinterpret_slice<T: ArrowNativeType + RefUnwindSafe>(arc: Arc<[T]>) -> Self {
249 let slice = arc.as_ref();
250 let data = NonNull::new(slice.as_ptr() as _).unwrap_or(NonNull::dangling());
251 let len = std::mem::size_of_val(slice);
252 let buffer = unsafe { Buffer::from_custom_allocation(data, len, Arc::new(arc)) };
254 Self::Borrowed(buffer)
255 }
256
257 pub fn borrow_to_typed_slice<T: ArrowNativeType>(&mut self) -> ScalarBuffer<T> {
266 let align = std::mem::align_of::<T>();
267 let is_aligned = self.as_ptr().align_offset(align) == 0;
268 if self.len() % std::mem::size_of::<T>() != 0 {
269 panic!("attempt to borrow_to_typed_slice to data type of size {} but we have {} bytes which isn't evenly divisible", std::mem::size_of::<T>(), self.len());
270 }
271
272 if is_aligned {
273 ScalarBuffer::<T>::from(self.borrow_and_clone().into_buffer())
274 } else {
275 let num_values = self.len() / std::mem::size_of::<T>();
276 let vec = Vec::<T>::with_capacity(num_values);
277 let mut bytes = MutableBuffer::from(vec);
278 bytes.extend_from_slice(self);
279 ScalarBuffer::<T>::from(Buffer::from(bytes))
280 }
281 }
282
283 pub fn concat_into_one(buffers: Vec<Self>) -> Self {
287 if buffers.len() == 1 {
288 return buffers.into_iter().next().unwrap();
289 }
290
291 let mut total_len = 0;
292 for buffer in &buffers {
293 total_len += buffer.len();
294 }
295
296 let mut data = Vec::with_capacity(total_len);
297 for buffer in buffers {
298 data.extend_from_slice(buffer.as_ref());
299 }
300
301 Self::Owned(data)
302 }
303
304 pub fn zip_into_one(buffers: Vec<(Self, u64)>, num_values: u64) -> Result<Self> {
308 let bytes_per_value = buffers.iter().map(|(_, bits_per_value)| {
309 if bits_per_value % 8 == 0 {
310 Ok(bits_per_value / 8)
311 } else {
312 Err(Error::InvalidInput { source: format!("LanceBuffer::zip_into_one only supports full-byte buffers currently and received a buffer with {} bits per value", bits_per_value).into(), location: location!() })
313 }
314 }).collect::<Result<Vec<_>>>()?;
315 let total_bytes_per_value = bytes_per_value.iter().sum::<u64>();
316 let total_bytes = (total_bytes_per_value * num_values) as usize;
317
318 let mut zipped = vec![0_u8; total_bytes];
319 let mut buffer_ptrs = buffers
320 .iter()
321 .zip(bytes_per_value)
322 .map(|((buffer, _), bytes_per_value)| (buffer.as_ptr(), bytes_per_value as usize))
323 .collect::<Vec<_>>();
324
325 let mut zipped_ptr = zipped.as_mut_ptr();
326 unsafe {
327 let end = zipped_ptr.add(total_bytes);
328 while zipped_ptr < end {
329 for (buf, bytes_per_value) in buffer_ptrs.iter_mut() {
330 std::ptr::copy_nonoverlapping(*buf, zipped_ptr, *bytes_per_value);
331 zipped_ptr = zipped_ptr.add(*bytes_per_value);
332 *buf = buf.add(*bytes_per_value);
333 }
334 }
335 }
336
337 Ok(Self::Owned(zipped))
338 }
339
340 pub fn copy_slice(slice: &[u8]) -> Self {
345 Self::Owned(slice.to_vec())
346 }
347
348 pub fn copy_array<const N: usize>(array: [u8; N]) -> Self {
353 Self::Owned(Vec::from(array))
354 }
355
356 #[allow(clippy::len_without_is_empty)]
357 pub fn len(&self) -> usize {
358 match self {
359 Self::Borrowed(buffer) => buffer.len(),
360 Self::Owned(buffer) => buffer.len(),
361 }
362 }
363
364 pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
371 let original_buffer_len = self.len();
372 assert!(
373 offset.saturating_add(length) <= original_buffer_len,
374 "the offset + length of the sliced Buffer cannot exceed the existing length"
375 );
376 match self {
377 Self::Borrowed(buffer) => Self::Borrowed(buffer.slice_with_length(offset, length)),
378 Self::Owned(buffer) => Self::Owned(buffer[offset..offset + length].to_vec()),
379 }
380 }
381
382 fn arrow_bit_slice(
384 buf: &arrow_buffer::Buffer,
385 offset: usize,
386 len: usize,
387 ) -> arrow_buffer::Buffer {
388 if offset % 8 == 0 {
389 return buf.slice_with_length(offset / 8, len.div_ceil(8));
390 }
391
392 arrow_buffer::bitwise_unary_op_helper(buf, offset, len, |a| a)
393 }
394
395 pub fn bit_slice_le_with_length(&mut self, offset: usize, length: usize) -> Self {
409 let Self::Borrowed(borrowed) = self.borrow_and_clone() else {
410 unreachable!()
411 };
412 let sliced = Self::arrow_bit_slice(&borrowed, offset, length);
415 Self::Borrowed(sliced)
416 }
417}
418
419impl AsRef<[u8]> for LanceBuffer {
420 fn as_ref(&self) -> &[u8] {
421 match self {
422 Self::Borrowed(buffer) => buffer.as_slice(),
423 Self::Owned(buffer) => buffer.as_slice(),
424 }
425 }
426}
427
428impl Deref for LanceBuffer {
429 type Target = [u8];
430
431 fn deref(&self) -> &Self::Target {
432 self.as_ref()
433 }
434}
435
436impl From<Vec<u8>> for LanceBuffer {
439 fn from(buffer: Vec<u8>) -> Self {
440 Self::Owned(buffer)
441 }
442}
443
444impl From<Buffer> for LanceBuffer {
445 fn from(buffer: Buffer) -> Self {
446 Self::Borrowed(buffer)
447 }
448}
449
450pub struct BorrowedBufferIter {
453 buffer: arrow_buffer::Buffer,
454 index: usize,
455}
456
457impl Iterator for BorrowedBufferIter {
458 type Item = u8;
459
460 fn next(&mut self) -> Option<Self::Item> {
461 if self.index >= self.buffer.len() {
462 None
463 } else {
464 let byte = unsafe { self.buffer.get_unchecked(self.index) };
466 self.index += 1;
467 Some(*byte)
468 }
469 }
470}
471
472impl IntoIterator for LanceBuffer {
473 type Item = u8;
474 type IntoIter = Either<std::vec::IntoIter<u8>, BorrowedBufferIter>;
475
476 fn into_iter(self) -> Self::IntoIter {
477 match self {
478 Self::Borrowed(buffer) => Either::Right(BorrowedBufferIter { buffer, index: 0 }),
479 Self::Owned(buffer) => Either::Left(buffer.into_iter()),
480 }
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use arrow_buffer::Buffer;
487
488 use super::LanceBuffer;
489
490 #[test]
491 fn test_eq() {
492 let buf = LanceBuffer::Borrowed(Buffer::from_vec(vec![1_u8, 2, 3]));
493 let buf2 = LanceBuffer::Owned(vec![1, 2, 3]);
494 assert_eq!(buf, buf2);
495 }
496
497 #[test]
498 fn test_reinterpret_vec() {
499 let vec = vec![1_u32, 2, 3];
500 let mut buf = LanceBuffer::reinterpret_vec(vec);
501
502 let mut expected = Vec::with_capacity(12);
503 expected.extend_from_slice(&1_u32.to_ne_bytes());
504 expected.extend_from_slice(&2_u32.to_ne_bytes());
505 expected.extend_from_slice(&3_u32.to_ne_bytes());
506 let expected = LanceBuffer::Owned(expected);
507
508 assert_eq!(expected, buf);
509 assert_eq!(buf.borrow_to_typed_slice::<u32>().as_ref(), vec![1, 2, 3]);
510 }
511
512 #[test]
513 fn test_concat() {
514 let buf1 = LanceBuffer::Owned(vec![1_u8, 2, 3]);
515 let buf2 = LanceBuffer::Owned(vec![4_u8, 5, 6]);
516 let buf3 = LanceBuffer::Owned(vec![7_u8, 8, 9]);
517
518 let expected = LanceBuffer::Owned(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
519 assert_eq!(
520 expected,
521 LanceBuffer::concat_into_one(vec![buf1, buf2, buf3])
522 );
523
524 let empty = LanceBuffer::empty();
525 assert_eq!(
526 LanceBuffer::empty(),
527 LanceBuffer::concat_into_one(vec![empty])
528 );
529
530 let expected = LanceBuffer::Owned(vec![1, 2, 3]);
531 assert_eq!(
532 expected,
533 LanceBuffer::concat_into_one(vec![expected.deep_copy(), LanceBuffer::empty()])
534 );
535 }
536
537 #[test]
538 fn test_zip() {
539 let buf1 = LanceBuffer::Owned(vec![1_u8, 2, 3]);
540 let buf2 = LanceBuffer::reinterpret_vec(vec![1_u16, 2, 3]);
541 let buf3 = LanceBuffer::reinterpret_vec(vec![1_u32, 2, 3]);
542
543 let zipped = LanceBuffer::zip_into_one(vec![(buf1, 8), (buf2, 16), (buf3, 32)], 3).unwrap();
544
545 assert_eq!(zipped.len(), 21);
546
547 let mut expected = Vec::with_capacity(21);
548 for i in 1..4 {
549 expected.push(i as u8);
550 expected.extend_from_slice(&(i as u16).to_ne_bytes());
551 expected.extend_from_slice(&(i as u32).to_ne_bytes());
552 }
553 let expected = LanceBuffer::Owned(expected);
554
555 assert_eq!(expected, zipped);
556 }
557
558 #[test]
559 fn test_hex() {
560 let buf = LanceBuffer::Owned(vec![1, 2, 15, 20]);
561 assert_eq!("01020F14", buf.as_hex());
562 }
563
564 #[test]
565 #[should_panic]
566 fn test_to_typed_slice_invalid() {
567 let mut buf = LanceBuffer::Owned(vec![0, 1, 2]);
568 buf.borrow_to_typed_slice::<u16>();
569 }
570
571 #[test]
572 fn test_to_typed_slice() {
573 let mut buf = LanceBuffer::Owned(vec![0, 1]);
576 let borrow = buf.borrow_to_typed_slice::<u16>();
577 let view_ptr = borrow.as_ref().as_ptr();
578 let borrow2 = buf.borrow_to_typed_slice::<u16>();
579 let view_ptr2 = borrow2.as_ref().as_ptr();
580
581 assert_eq!(view_ptr, view_ptr2);
582
583 let bytes = bytes::Bytes::from(vec![0, 1, 2]);
584 let sliced = bytes.slice(1..3);
585 let mut buf = LanceBuffer::from_bytes(sliced, 1);
587 let borrow = buf.borrow_to_typed_slice::<u16>();
588 let view_ptr = borrow.as_ref().as_ptr();
589 let borrow2 = buf.borrow_to_typed_slice::<u16>();
590 let view_ptr2 = borrow2.as_ref().as_ptr();
591
592 assert_ne!(view_ptr, view_ptr2);
593 }
594
595 #[test]
596 fn test_bit_slice_le() {
597 let mut buf = LanceBuffer::Owned(vec![0x0F, 0x0B]);
598
599 assert_eq!(buf.bit_slice_le_with_length(0, 4).as_ref(), &[0x0F]);
601 assert_eq!(buf.bit_slice_le_with_length(4, 4).as_ref(), &[0x00]);
602 assert_eq!(buf.bit_slice_le_with_length(3, 8).as_ref(), &[0x61]);
603 assert_eq!(buf.bit_slice_le_with_length(0, 8).as_ref(), &[0x0F]);
604 assert_eq!(buf.bit_slice_le_with_length(4, 8).as_ref(), &[0xB0]);
605 assert_eq!(buf.bit_slice_le_with_length(4, 12).as_ref(), &[0xB0, 0x00]);
606 }
607}