1use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc};
7
8use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
9use lance_core::{utils::bit::is_pwr_two, Error, Result};
10use snafu::location;
11use std::borrow::Cow;
12
13#[derive(Clone, Debug, PartialEq, Eq)]
23pub struct LanceBuffer(Buffer);
24
25impl LanceBuffer {
26 pub fn into_buffer(self) -> Buffer {
28 self.0
29 }
30
31 pub fn all_unset(len: usize) -> Self {
33 Self(Buffer::from_vec(vec![0; len]))
34 }
35
36 pub fn all_set(len: usize) -> Self {
38 Self(Buffer::from_vec(vec![0xff; len]))
39 }
40
41 pub fn empty() -> Self {
43 Self(Buffer::from_vec(Vec::<u8>::new()))
44 }
45
46 pub fn as_hex(&self) -> String {
48 hex::encode_upper(self)
49 }
50
51 pub fn concat(buffers: &[Self]) -> Self {
55 let total_len = buffers.iter().map(|b| b.len()).sum();
56 let mut data = Vec::with_capacity(total_len);
57 for buffer in buffers {
58 data.extend_from_slice(buffer.as_ref());
59 }
60 Self(Buffer::from_vec(data))
61 }
62
63 pub fn as_spaced_hex(&self, bytes_per_word: u32) -> String {
66 let hex = self.as_hex();
67 let chars_per_word = bytes_per_word as usize * 2;
68 let num_words = hex.len() / chars_per_word;
69 let mut spaced_hex = String::with_capacity(hex.len() + num_words);
70 for (i, c) in hex.chars().enumerate() {
71 if i % chars_per_word == 0 && i != 0 {
72 spaced_hex.push(' ');
73 }
74 spaced_hex.push(c);
75 }
76 spaced_hex
77 }
78
79 pub fn from_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
93 if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
94 {
95 let mut buf = Vec::with_capacity(bytes.len());
97 buf.extend_from_slice(&bytes);
98 Self(Buffer::from_vec(buf))
99 } else {
100 unsafe {
103 Self(Buffer::from_custom_allocation(
104 NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
105 bytes.len(),
106 Arc::new(bytes),
107 ))
108 }
109 }
110 }
111
112 pub fn deep_copy(&self) -> Self {
114 Self(Buffer::from_vec(self.0.to_vec()))
115 }
116
117 pub fn reinterpret_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
128 Self(Buffer::from_vec(vec))
129 }
130
131 pub fn reinterpret_slice<T: ArrowNativeType + RefUnwindSafe>(arc: Arc<[T]>) -> Self {
137 let slice = arc.as_ref();
138 let data = NonNull::new(slice.as_ptr() as _).unwrap_or(NonNull::dangling());
139 let len = std::mem::size_of_val(slice);
140 let buffer = unsafe { Buffer::from_custom_allocation(data, len, Arc::new(arc)) };
142 Self(buffer)
143 }
144
145 pub fn borrow_to_typed_slice<T: ArrowNativeType>(&self) -> ScalarBuffer<T> {
154 let align = std::mem::align_of::<T>();
155 let is_aligned = self.as_ptr().align_offset(align) == 0;
156 if self.len() % std::mem::size_of::<T>() != 0 {
157 panic!("attempt to borrow_to_typed_slice to data type of size {} but we have {} bytes which isn't evenly divisible", std::mem::size_of::<T>(), self.len());
158 }
159
160 if is_aligned {
161 ScalarBuffer::<T>::from(self.clone().into_buffer())
162 } else {
163 let num_values = self.len() / std::mem::size_of::<T>();
164 let vec = Vec::<T>::with_capacity(num_values);
165 let mut bytes = MutableBuffer::from(vec);
166 bytes.extend_from_slice(self);
167 ScalarBuffer::<T>::from(Buffer::from(bytes))
168 }
169 }
170
171 pub fn borrow_to_typed_view<T: ArrowNativeType + bytemuck::Pod>(&self) -> Cow<'_, [T]> {
186 let align = std::mem::align_of::<T>();
187 if self.len() % std::mem::size_of::<T>() != 0 {
188 panic!("attempt to view data type of size {} but we have {} bytes which isn't evenly divisible", std::mem::size_of::<T>(), self.len());
189 }
190
191 if self.as_ptr().align_offset(align) == 0 {
192 Cow::Borrowed(bytemuck::cast_slice(&self.0))
193 } else {
194 Cow::Owned(bytemuck::pod_collect_to_vec(self.0.as_slice()))
195 }
196 }
197
198 pub fn concat_into_one(buffers: Vec<Self>) -> Self {
202 if buffers.len() == 1 {
203 return buffers.into_iter().next().unwrap();
204 }
205
206 let mut total_len = 0;
207 for buffer in &buffers {
208 total_len += buffer.len();
209 }
210
211 let mut data = Vec::with_capacity(total_len);
212 for buffer in buffers {
213 data.extend_from_slice(buffer.as_ref());
214 }
215
216 Self(Buffer::from_vec(data))
217 }
218
219 pub fn zip_into_one(buffers: Vec<(Self, u64)>, num_values: u64) -> Result<Self> {
223 let bytes_per_value = buffers.iter().map(|(_, bits_per_value)| {
224 if bits_per_value % 8 == 0 {
225 Ok(bits_per_value / 8)
226 } else {
227 Err(Error::InvalidInput { source: format!("LanceBuffer::zip_into_one only supports full-byte buffers currently and received a buffer with {} bits per value", bits_per_value).into(), location: location!() })
228 }
229 }).collect::<Result<Vec<_>>>()?;
230 let total_bytes_per_value = bytes_per_value.iter().sum::<u64>();
231 let total_bytes = (total_bytes_per_value * num_values) as usize;
232
233 let mut zipped = vec![0_u8; total_bytes];
234 let mut buffer_ptrs = buffers
235 .iter()
236 .zip(bytes_per_value)
237 .map(|((buffer, _), bytes_per_value)| (buffer.as_ptr(), bytes_per_value as usize))
238 .collect::<Vec<_>>();
239
240 let mut zipped_ptr = zipped.as_mut_ptr();
241 unsafe {
242 let end = zipped_ptr.add(total_bytes);
243 while zipped_ptr < end {
244 for (buf, bytes_per_value) in buffer_ptrs.iter_mut() {
245 std::ptr::copy_nonoverlapping(*buf, zipped_ptr, *bytes_per_value);
246 zipped_ptr = zipped_ptr.add(*bytes_per_value);
247 *buf = buf.add(*bytes_per_value);
248 }
249 }
250 }
251
252 Ok(Self(Buffer::from_vec(zipped)))
253 }
254
255 pub fn copy_slice(slice: &[u8]) -> Self {
260 Self(Buffer::from_vec(slice.to_vec()))
261 }
262
263 pub fn copy_array<const N: usize>(array: [u8; N]) -> Self {
268 Self(Buffer::from_vec(Vec::from(array)))
269 }
270
271 #[allow(clippy::len_without_is_empty)]
272 pub fn len(&self) -> usize {
273 self.0.len()
274 }
275
276 pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
282 let original_buffer_len = self.len();
283 assert!(
284 offset.saturating_add(length) <= original_buffer_len,
285 "the offset + length of the sliced Buffer cannot exceed the existing length"
286 );
287 Self(self.0.slice_with_length(offset, length))
288 }
289
290 pub fn bit_slice_le_with_length(&self, offset: usize, length: usize) -> Self {
301 let sliced = self.0.bit_slice(offset, length);
302 Self(sliced)
303 }
304
305 pub fn as_ptr(&self) -> *const u8 {
307 self.0.as_ptr()
308 }
309}
310
311impl AsRef<[u8]> for LanceBuffer {
312 fn as_ref(&self) -> &[u8] {
313 self.0.as_slice()
314 }
315}
316
317impl Deref for LanceBuffer {
318 type Target = [u8];
319
320 fn deref(&self) -> &Self::Target {
321 self.as_ref()
322 }
323}
324
325impl From<Vec<u8>> for LanceBuffer {
328 fn from(buffer: Vec<u8>) -> Self {
329 Self(Buffer::from_vec(buffer))
330 }
331}
332
333impl From<Buffer> for LanceBuffer {
334 fn from(buffer: Buffer) -> Self {
335 Self(buffer)
336 }
337}
338
339pub struct LanceBufferIter {
342 buffer: Buffer,
343 index: usize,
344}
345
346impl Iterator for LanceBufferIter {
347 type Item = u8;
348
349 fn next(&mut self) -> Option<Self::Item> {
350 if self.index >= self.buffer.len() {
351 None
352 } else {
353 let byte = unsafe { self.buffer.get_unchecked(self.index) };
355 self.index += 1;
356 Some(*byte)
357 }
358 }
359}
360
361impl IntoIterator for LanceBuffer {
362 type Item = u8;
363 type IntoIter = LanceBufferIter;
364
365 fn into_iter(self) -> Self::IntoIter {
366 LanceBufferIter {
367 buffer: self.0,
368 index: 0,
369 }
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use arrow_buffer::Buffer;
376
377 use super::LanceBuffer;
378
379 #[test]
380 fn test_eq() {
381 let buf = LanceBuffer::from(Buffer::from_vec(vec![1_u8, 2, 3]));
382 let buf2 = LanceBuffer::from(vec![1, 2, 3]);
383 assert_eq!(buf, buf2);
384 }
385
386 #[test]
387 fn test_reinterpret_vec() {
388 let vec = vec![1_u32, 2, 3];
389 let buf = LanceBuffer::reinterpret_vec(vec);
390
391 let mut expected = Vec::with_capacity(12);
392 expected.extend_from_slice(&1_u32.to_ne_bytes());
393 expected.extend_from_slice(&2_u32.to_ne_bytes());
394 expected.extend_from_slice(&3_u32.to_ne_bytes());
395 let expected = LanceBuffer::from(expected);
396
397 assert_eq!(expected, buf);
398 assert_eq!(buf.borrow_to_typed_slice::<u32>().as_ref(), vec![1, 2, 3]);
399 }
400
401 #[test]
402 fn test_concat() {
403 let buf1 = LanceBuffer::from(vec![1_u8, 2, 3]);
404 let buf2 = LanceBuffer::from(vec![4_u8, 5, 6]);
405 let buf3 = LanceBuffer::from(vec![7_u8, 8, 9]);
406
407 let expected = LanceBuffer::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
408 assert_eq!(
409 expected,
410 LanceBuffer::concat_into_one(vec![buf1, buf2, buf3])
411 );
412
413 let empty = LanceBuffer::empty();
414 assert_eq!(
415 LanceBuffer::empty(),
416 LanceBuffer::concat_into_one(vec![empty])
417 );
418
419 let expected = LanceBuffer::from(vec![1, 2, 3]);
420 assert_eq!(
421 expected,
422 LanceBuffer::concat_into_one(vec![expected.deep_copy(), LanceBuffer::empty()])
423 );
424 }
425
426 #[test]
427 fn test_zip() {
428 let buf1 = LanceBuffer::from(vec![1_u8, 2, 3]);
429 let buf2 = LanceBuffer::reinterpret_vec(vec![1_u16, 2, 3]);
430 let buf3 = LanceBuffer::reinterpret_vec(vec![1_u32, 2, 3]);
431
432 let zipped = LanceBuffer::zip_into_one(vec![(buf1, 8), (buf2, 16), (buf3, 32)], 3).unwrap();
433
434 assert_eq!(zipped.len(), 21);
435
436 let mut expected = Vec::with_capacity(21);
437 for i in 1..4 {
438 expected.push(i as u8);
439 expected.extend_from_slice(&(i as u16).to_ne_bytes());
440 expected.extend_from_slice(&(i as u32).to_ne_bytes());
441 }
442 let expected = LanceBuffer::from(expected);
443
444 assert_eq!(expected, zipped);
445 }
446
447 #[test]
448 fn test_hex() {
449 let buf = LanceBuffer::from(vec![1, 2, 15, 20]);
450 assert_eq!("01020F14", buf.as_hex());
451 }
452
453 #[test]
454 #[should_panic]
455 fn test_to_typed_slice_invalid() {
456 let buf = LanceBuffer::from(vec![0, 1, 2]);
457 buf.borrow_to_typed_slice::<u16>();
458 }
459
460 #[test]
461 fn test_to_typed_slice() {
462 let buf = LanceBuffer::from(vec![0, 1]);
465 let borrow = buf.borrow_to_typed_slice::<u16>();
466 let view_ptr = borrow.as_ref().as_ptr();
467 let borrow2 = buf.borrow_to_typed_slice::<u16>();
468 let view_ptr2 = borrow2.as_ref().as_ptr();
469
470 assert_eq!(view_ptr, view_ptr2);
471
472 let bytes = bytes::Bytes::from(vec![0, 1, 2]);
473 let sliced = bytes.slice(1..3);
474 let buf = LanceBuffer::from_bytes(sliced, 1);
476 let borrow = buf.borrow_to_typed_slice::<u16>();
477 let view_ptr = borrow.as_ref().as_ptr();
478 let borrow2 = buf.borrow_to_typed_slice::<u16>();
479 let view_ptr2 = borrow2.as_ref().as_ptr();
480
481 assert_ne!(view_ptr, view_ptr2);
482 }
483
484 #[test]
485 fn test_bit_slice_le() {
486 let buf = LanceBuffer::from(vec![0x0F, 0x0B]);
487
488 assert_eq!(buf.bit_slice_le_with_length(0, 4).as_ref(), &[0x0F]);
490 assert_eq!(buf.bit_slice_le_with_length(4, 4).as_ref(), &[0x00]);
491 assert_eq!(buf.bit_slice_le_with_length(3, 8).as_ref(), &[0x61]);
492 assert_eq!(buf.bit_slice_le_with_length(0, 8).as_ref(), &[0x0F]);
493 assert_eq!(buf.bit_slice_le_with_length(4, 8).as_ref(), &[0xB0]);
494 assert_eq!(buf.bit_slice_le_with_length(4, 12).as_ref(), &[0xB0, 0x00]);
495 }
496}