1use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc};
7
8use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
9use lance_core::{Error, Result, utils::bit::is_pwr_two};
10use std::borrow::Cow;
11
12#[derive(Clone, Debug, PartialEq, Eq)]
22pub struct LanceBuffer(Buffer);
23
24impl LanceBuffer {
25 pub fn into_buffer(self) -> Buffer {
27 self.0
28 }
29
30 pub fn all_unset(len: usize) -> Self {
32 Self(Buffer::from_vec(vec![0; len]))
33 }
34
35 pub fn all_set(len: usize) -> Self {
37 Self(Buffer::from_vec(vec![0xff; len]))
38 }
39
40 pub fn empty() -> Self {
42 Self(Buffer::from_vec(Vec::<u8>::new()))
43 }
44
45 pub fn as_hex(&self) -> String {
47 hex::encode_upper(self)
48 }
49
50 pub fn concat(buffers: &[Self]) -> Self {
54 let total_len = buffers.iter().map(|b| b.len()).sum();
55 let mut data = Vec::with_capacity(total_len);
56 for buffer in buffers {
57 data.extend_from_slice(buffer.as_ref());
58 }
59 Self(Buffer::from_vec(data))
60 }
61
62 pub fn as_spaced_hex(&self, bytes_per_word: u32) -> String {
65 let hex = self.as_hex();
66 let chars_per_word = bytes_per_word as usize * 2;
67 let num_words = hex.len() / chars_per_word;
68 let mut spaced_hex = String::with_capacity(hex.len() + num_words);
69 for (i, c) in hex.chars().enumerate() {
70 if i % chars_per_word == 0 && i != 0 {
71 spaced_hex.push(' ');
72 }
73 spaced_hex.push(c);
74 }
75 spaced_hex
76 }
77
78 pub fn from_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
92 if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
93 {
94 let mut buf = Vec::with_capacity(bytes.len());
96 buf.extend_from_slice(&bytes);
97 Self(Buffer::from_vec(buf))
98 } else {
99 unsafe {
102 Self(Buffer::from_custom_allocation(
103 NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
104 bytes.len(),
105 Arc::new(bytes),
106 ))
107 }
108 }
109 }
110
111 pub fn deep_copy(&self) -> Self {
113 Self(Buffer::from_vec(self.0.to_vec()))
114 }
115
116 pub fn reinterpret_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
127 Self(Buffer::from_vec(vec))
128 }
129
130 pub fn reinterpret_slice<T: ArrowNativeType + RefUnwindSafe>(arc: Arc<[T]>) -> Self {
136 let slice = arc.as_ref();
137 let data = NonNull::new(slice.as_ptr() as _).unwrap_or(NonNull::dangling());
138 let len = std::mem::size_of_val(slice);
139 let buffer = unsafe { Buffer::from_custom_allocation(data, len, Arc::new(arc)) };
141 Self(buffer)
142 }
143
144 pub fn borrow_to_typed_slice<T: ArrowNativeType>(&self) -> ScalarBuffer<T> {
153 let align = std::mem::align_of::<T>();
154 let is_aligned = self.as_ptr().align_offset(align) == 0;
155 if !self.len().is_multiple_of(std::mem::size_of::<T>()) {
156 panic!(
157 "attempt to borrow_to_typed_slice to data type of size {} but we have {} bytes which isn't evenly divisible",
158 std::mem::size_of::<T>(),
159 self.len()
160 );
161 }
162
163 if is_aligned {
164 ScalarBuffer::<T>::from(self.clone().into_buffer())
165 } else {
166 let num_values = self.len() / std::mem::size_of::<T>();
167 let vec = Vec::<T>::with_capacity(num_values);
168 let mut bytes = MutableBuffer::from(vec);
169 bytes.extend_from_slice(self);
170 ScalarBuffer::<T>::from(Buffer::from(bytes))
171 }
172 }
173
174 pub fn borrow_to_typed_view<T: ArrowNativeType + bytemuck::Pod>(&self) -> Cow<'_, [T]> {
189 let align = std::mem::align_of::<T>();
190 if !self.len().is_multiple_of(std::mem::size_of::<T>()) {
191 panic!(
192 "attempt to view data type of size {} but we have {} bytes which isn't evenly divisible",
193 std::mem::size_of::<T>(),
194 self.len()
195 );
196 }
197
198 if self.as_ptr().align_offset(align) == 0 {
199 Cow::Borrowed(bytemuck::cast_slice(&self.0))
200 } else {
201 Cow::Owned(bytemuck::pod_collect_to_vec(self.0.as_slice()))
202 }
203 }
204
205 pub fn concat_into_one(buffers: Vec<Self>) -> Self {
209 if buffers.len() == 1 {
210 return buffers.into_iter().next().unwrap();
211 }
212
213 let mut total_len = 0;
214 for buffer in &buffers {
215 total_len += buffer.len();
216 }
217
218 let mut data = Vec::with_capacity(total_len);
219 for buffer in buffers {
220 data.extend_from_slice(buffer.as_ref());
221 }
222
223 Self(Buffer::from_vec(data))
224 }
225
226 pub fn zip_into_one(buffers: Vec<(Self, u64)>, num_values: u64) -> Result<Self> {
230 let bytes_per_value = buffers.iter().map(|(_, bits_per_value)| {
231 if bits_per_value % 8 == 0 {
232 Ok(bits_per_value / 8)
233 } else {
234 Err(Error::invalid_input_source(format!("LanceBuffer::zip_into_one only supports full-byte buffers currently and received a buffer with {} bits per value", bits_per_value).into()))
235 }
236 }).collect::<Result<Vec<_>>>()?;
237 let total_bytes_per_value = bytes_per_value.iter().sum::<u64>();
238 let total_bytes = (total_bytes_per_value * num_values) as usize;
239
240 let mut zipped = vec![0_u8; total_bytes];
241 let mut buffer_ptrs = buffers
242 .iter()
243 .zip(bytes_per_value)
244 .map(|((buffer, _), bytes_per_value)| (buffer.as_ptr(), bytes_per_value as usize))
245 .collect::<Vec<_>>();
246
247 let mut zipped_ptr = zipped.as_mut_ptr();
248 unsafe {
249 let end = zipped_ptr.add(total_bytes);
250 while zipped_ptr < end {
251 for (buf, bytes_per_value) in buffer_ptrs.iter_mut() {
252 std::ptr::copy_nonoverlapping(*buf, zipped_ptr, *bytes_per_value);
253 zipped_ptr = zipped_ptr.add(*bytes_per_value);
254 *buf = buf.add(*bytes_per_value);
255 }
256 }
257 }
258
259 Ok(Self(Buffer::from_vec(zipped)))
260 }
261
262 pub fn copy_slice(slice: &[u8]) -> Self {
267 Self(Buffer::from_vec(slice.to_vec()))
268 }
269
270 pub fn copy_array<const N: usize>(array: [u8; N]) -> Self {
275 Self(Buffer::from_vec(Vec::from(array)))
276 }
277
278 #[allow(clippy::len_without_is_empty)]
279 pub fn len(&self) -> usize {
280 self.0.len()
281 }
282
283 pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
289 let original_buffer_len = self.len();
290 assert!(
291 offset.saturating_add(length) <= original_buffer_len,
292 "the offset + length of the sliced Buffer cannot exceed the existing length"
293 );
294 Self(self.0.slice_with_length(offset, length))
295 }
296
297 pub fn bit_slice_le_with_length(&self, offset: usize, length: usize) -> Self {
308 let sliced = self.0.bit_slice(offset, length);
309 Self(sliced)
310 }
311
312 pub fn as_ptr(&self) -> *const u8 {
314 self.0.as_ptr()
315 }
316}
317
318impl AsRef<[u8]> for LanceBuffer {
319 fn as_ref(&self) -> &[u8] {
320 self.0.as_slice()
321 }
322}
323
324impl Deref for LanceBuffer {
325 type Target = [u8];
326
327 fn deref(&self) -> &Self::Target {
328 self.as_ref()
329 }
330}
331
332impl From<Vec<u8>> for LanceBuffer {
335 fn from(buffer: Vec<u8>) -> Self {
336 Self(Buffer::from_vec(buffer))
337 }
338}
339
340impl From<Buffer> for LanceBuffer {
341 fn from(buffer: Buffer) -> Self {
342 Self(buffer)
343 }
344}
345
346pub struct LanceBufferIter {
349 buffer: Buffer,
350 index: usize,
351}
352
353impl Iterator for LanceBufferIter {
354 type Item = u8;
355
356 fn next(&mut self) -> Option<Self::Item> {
357 if self.index >= self.buffer.len() {
358 None
359 } else {
360 let byte = unsafe { self.buffer.get_unchecked(self.index) };
362 self.index += 1;
363 Some(*byte)
364 }
365 }
366}
367
368impl IntoIterator for LanceBuffer {
369 type Item = u8;
370 type IntoIter = LanceBufferIter;
371
372 fn into_iter(self) -> Self::IntoIter {
373 LanceBufferIter {
374 buffer: self.0,
375 index: 0,
376 }
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use arrow_buffer::Buffer;
383
384 use super::LanceBuffer;
385
386 #[test]
387 fn test_eq() {
388 let buf = LanceBuffer::from(Buffer::from_vec(vec![1_u8, 2, 3]));
389 let buf2 = LanceBuffer::from(vec![1, 2, 3]);
390 assert_eq!(buf, buf2);
391 }
392
393 #[test]
394 fn test_reinterpret_vec() {
395 let vec = vec![1_u32, 2, 3];
396 let buf = LanceBuffer::reinterpret_vec(vec);
397
398 let mut expected = Vec::with_capacity(12);
399 expected.extend_from_slice(&1_u32.to_ne_bytes());
400 expected.extend_from_slice(&2_u32.to_ne_bytes());
401 expected.extend_from_slice(&3_u32.to_ne_bytes());
402 let expected = LanceBuffer::from(expected);
403
404 assert_eq!(expected, buf);
405 assert_eq!(buf.borrow_to_typed_slice::<u32>().as_ref(), vec![1, 2, 3]);
406 }
407
408 #[test]
409 fn test_concat() {
410 let buf1 = LanceBuffer::from(vec![1_u8, 2, 3]);
411 let buf2 = LanceBuffer::from(vec![4_u8, 5, 6]);
412 let buf3 = LanceBuffer::from(vec![7_u8, 8, 9]);
413
414 let expected = LanceBuffer::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
415 assert_eq!(
416 expected,
417 LanceBuffer::concat_into_one(vec![buf1, buf2, buf3])
418 );
419
420 let empty = LanceBuffer::empty();
421 assert_eq!(
422 LanceBuffer::empty(),
423 LanceBuffer::concat_into_one(vec![empty])
424 );
425
426 let expected = LanceBuffer::from(vec![1, 2, 3]);
427 assert_eq!(
428 expected,
429 LanceBuffer::concat_into_one(vec![expected.deep_copy(), LanceBuffer::empty()])
430 );
431 }
432
433 #[test]
434 fn test_zip() {
435 let buf1 = LanceBuffer::from(vec![1_u8, 2, 3]);
436 let buf2 = LanceBuffer::reinterpret_vec(vec![1_u16, 2, 3]);
437 let buf3 = LanceBuffer::reinterpret_vec(vec![1_u32, 2, 3]);
438
439 let zipped = LanceBuffer::zip_into_one(vec![(buf1, 8), (buf2, 16), (buf3, 32)], 3).unwrap();
440
441 assert_eq!(zipped.len(), 21);
442
443 let mut expected = Vec::with_capacity(21);
444 for i in 1..4 {
445 expected.push(i as u8);
446 expected.extend_from_slice(&(i as u16).to_ne_bytes());
447 expected.extend_from_slice(&(i as u32).to_ne_bytes());
448 }
449 let expected = LanceBuffer::from(expected);
450
451 assert_eq!(expected, zipped);
452 }
453
454 #[test]
455 fn test_hex() {
456 let buf = LanceBuffer::from(vec![1, 2, 15, 20]);
457 assert_eq!("01020F14", buf.as_hex());
458 }
459
460 #[test]
461 #[should_panic]
462 fn test_to_typed_slice_invalid() {
463 let buf = LanceBuffer::from(vec![0, 1, 2]);
464 buf.borrow_to_typed_slice::<u16>();
465 }
466
467 #[test]
468 fn test_to_typed_slice() {
469 let buf = LanceBuffer::from(vec![0, 1]);
472 let borrow = buf.borrow_to_typed_slice::<u16>();
473 let view_ptr = borrow.as_ref().as_ptr();
474 let borrow2 = buf.borrow_to_typed_slice::<u16>();
475 let view_ptr2 = borrow2.as_ref().as_ptr();
476
477 assert_eq!(view_ptr, view_ptr2);
478
479 let bytes = bytes::Bytes::from(vec![0, 1, 2]);
480 let sliced = bytes.slice(1..3);
481 let buf = LanceBuffer::from_bytes(sliced, 1);
483 let borrow = buf.borrow_to_typed_slice::<u16>();
484 let view_ptr = borrow.as_ref().as_ptr();
485 let borrow2 = buf.borrow_to_typed_slice::<u16>();
486 let view_ptr2 = borrow2.as_ref().as_ptr();
487
488 assert_ne!(view_ptr, view_ptr2);
489 }
490
491 #[test]
492 fn test_bit_slice_le() {
493 let buf = LanceBuffer::from(vec![0x0F, 0x0B]);
494
495 assert_eq!(buf.bit_slice_le_with_length(0, 4).as_ref(), &[0x0F]);
497 assert_eq!(buf.bit_slice_le_with_length(4, 4).as_ref(), &[0x00]);
498 assert_eq!(buf.bit_slice_le_with_length(3, 8).as_ref(), &[0x61]);
499 assert_eq!(buf.bit_slice_le_with_length(0, 8).as_ref(), &[0x0F]);
500 assert_eq!(buf.bit_slice_le_with_length(4, 8).as_ref(), &[0xB0]);
501 assert_eq!(buf.bit_slice_le_with_length(4, 12).as_ref(), &[0xB0, 0x00]);
502 }
503}