1use std::any::type_name;
2use std::cmp::Ordering;
3use std::collections::Bound;
4use std::fmt::{Debug, Formatter};
5use std::hash::{Hash, Hasher};
6use std::ops::{Deref, RangeBounds};
7
8use bytes::{Buf, Bytes};
9use vortex_error::{VortexExpect, vortex_panic};
10
11use crate::debug::TruncatedDebug;
12use crate::{Alignment, BufferMut, ByteBuffer};
13
14#[derive(Clone)]
16pub struct Buffer<T> {
17 pub(crate) bytes: Bytes,
18 pub(crate) length: usize,
19 pub(crate) alignment: Alignment,
20 pub(crate) _marker: std::marker::PhantomData<T>,
21}
22
23impl<T> PartialEq for Buffer<T> {
24 fn eq(&self, other: &Self) -> bool {
25 self.bytes == other.bytes
26 }
27}
28
29impl<T> Eq for Buffer<T> {}
30
31impl<T> Ord for Buffer<T> {
32 fn cmp(&self, other: &Self) -> Ordering {
33 self.bytes.cmp(&other.bytes)
34 }
35}
36
37impl<T> PartialOrd for Buffer<T> {
38 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
39 Some(self.bytes.cmp(&other.bytes))
40 }
41}
42
43impl<T> Hash for Buffer<T> {
44 fn hash<H: Hasher>(&self, state: &mut H) {
45 self.bytes.as_ref().hash(state)
46 }
47}
48
49impl<T> Buffer<T> {
50 pub fn copy_from(values: impl AsRef<[T]>) -> Self {
57 BufferMut::copy_from(values).freeze()
58 }
59
60 pub fn copy_from_aligned(values: impl AsRef<[T]>, alignment: Alignment) -> Self {
62 BufferMut::copy_from_aligned(values, alignment).freeze()
63 }
64
65 pub fn zeroed(len: usize) -> Self {
67 Self::zeroed_aligned(len, Alignment::of::<T>())
68 }
69
70 pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
72 BufferMut::zeroed_aligned(len, alignment).freeze()
73 }
74
75 pub fn empty() -> Self {
77 BufferMut::empty().freeze()
78 }
79
80 pub fn empty_aligned(alignment: Alignment) -> Self {
82 BufferMut::empty_aligned(alignment).freeze()
83 }
84
85 pub fn full(item: T, len: usize) -> Self
87 where
88 T: Copy,
89 {
90 BufferMut::full(item, len).freeze()
91 }
92
93 pub fn from_byte_buffer(buffer: ByteBuffer) -> Self {
100 Self::from_byte_buffer_aligned(buffer, Alignment::of::<T>())
102 }
103
104 pub fn from_byte_buffer_aligned(buffer: ByteBuffer, alignment: Alignment) -> Self {
111 Self::from_bytes_aligned(buffer.into_inner(), alignment)
112 }
113
114 pub fn from_bytes_aligned(bytes: Bytes, alignment: Alignment) -> Self {
121 if !alignment.is_aligned_to(Alignment::of::<T>()) {
122 vortex_panic!(
123 "Alignment {} must be compatible with the scalar type's alignment {}",
124 alignment,
125 Alignment::of::<T>(),
126 );
127 }
128 if bytes.as_ptr().align_offset(*alignment) != 0 {
129 vortex_panic!(
130 "Bytes alignment must align to the requested alignment {}",
131 alignment,
132 );
133 }
134 if bytes.len() % size_of::<T>() != 0 {
135 vortex_panic!(
136 "Bytes length {} must be a multiple of the scalar type's size {}",
137 bytes.len(),
138 size_of::<T>()
139 );
140 }
141 let length = bytes.len() / size_of::<T>();
142 Self {
143 bytes,
144 length,
145 alignment,
146 _marker: Default::default(),
147 }
148 }
149
150 #[inline(always)]
152 pub fn len(&self) -> usize {
153 self.length
154 }
155
156 #[inline(always)]
158 pub fn is_empty(&self) -> bool {
159 self.length == 0
160 }
161
162 #[inline(always)]
164 pub fn alignment(&self) -> Alignment {
165 self.alignment
166 }
167
168 #[inline(always)]
170 pub fn as_slice(&self) -> &[T] {
171 let raw_slice = self.bytes.as_ref();
172 unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
174 }
175
176 pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
178 self.as_slice().iter()
179 }
180
181 #[inline(always)]
188 pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
189 self.slice_with_alignment(range, self.alignment)
190 }
191
192 #[inline(always)]
199 pub fn slice_unaligned(&self, range: impl RangeBounds<usize>) -> Self {
200 self.slice_with_alignment(range, Alignment::of::<u8>())
201 }
202
203 pub fn slice_with_alignment(
211 &self,
212 range: impl RangeBounds<usize>,
213 alignment: Alignment,
214 ) -> Self {
215 let len = self.len();
216 let begin = match range.start_bound() {
217 Bound::Included(&n) => n,
218 Bound::Excluded(&n) => n.checked_add(1).vortex_expect("out of range"),
219 Bound::Unbounded => 0,
220 };
221 let end = match range.end_bound() {
222 Bound::Included(&n) => n.checked_add(1).vortex_expect("out of range"),
223 Bound::Excluded(&n) => n,
224 Bound::Unbounded => len,
225 };
226
227 if begin > end {
228 vortex_panic!(
229 "range start must not be greater than end: {:?} <= {:?}",
230 begin,
231 end
232 );
233 }
234 if end > len {
235 vortex_panic!("range end out of bounds: {:?} <= {:?}", end, len);
236 }
237
238 if end == begin {
239 return Self::empty_aligned(alignment);
242 }
243
244 let begin_byte = begin * size_of::<T>();
245 let end_byte = end * size_of::<T>();
246
247 if !begin_byte.is_multiple_of(*alignment) {
248 vortex_panic!("range start must be aligned to {:?}", alignment);
249 }
250 if !end_byte.is_multiple_of(*alignment) {
251 vortex_panic!("range end must be aligned to {:?}", alignment);
252 }
253 if !alignment.is_aligned_to(Alignment::of::<T>()) {
254 vortex_panic!("Slice alignment must at least align to type T")
255 }
256
257 Self {
258 bytes: self.bytes.slice(begin_byte..end_byte),
259 length: end - begin,
260 alignment,
261 _marker: Default::default(),
262 }
263 }
264
265 #[inline(always)]
274 pub fn slice_ref(&self, subset: &[T]) -> Self {
275 self.slice_ref_with_alignment(subset, Alignment::of::<T>())
276 }
277
278 pub fn slice_ref_with_alignment(&self, subset: &[T], alignment: Alignment) -> Self {
288 if !alignment.is_aligned_to(Alignment::of::<T>()) {
289 vortex_panic!("slice_ref alignment must at least align to type T")
290 }
291
292 if !self.alignment.is_aligned_to(alignment) {
293 vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
294 }
295
296 if subset.as_ptr().align_offset(*alignment) != 0 {
297 vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
298 }
299
300 let subset_u8 =
301 unsafe { std::slice::from_raw_parts(subset.as_ptr().cast(), size_of_val(subset)) };
302
303 Self {
304 bytes: self.bytes.slice_ref(subset_u8),
305 length: subset.len(),
306 alignment,
307 _marker: Default::default(),
308 }
309 }
310
311 pub fn inner(&self) -> &Bytes {
313 debug_assert_eq!(
314 self.length * size_of::<T>(),
315 self.bytes.len(),
316 "Own length has to be the same as the underlying bytes length"
317 );
318 &self.bytes
319 }
320
321 pub fn into_inner(self) -> Bytes {
323 debug_assert_eq!(
324 self.length * size_of::<T>(),
325 self.bytes.len(),
326 "Own length has to be the same as the underlying bytes length"
327 );
328 self.bytes
329 }
330
331 pub fn into_byte_buffer(self) -> ByteBuffer {
333 ByteBuffer {
334 bytes: self.bytes,
335 length: self.length * size_of::<T>(),
336 alignment: self.alignment,
337 _marker: Default::default(),
338 }
339 }
340
341 pub fn into_mut(self) -> BufferMut<T> {
343 self.try_into_mut()
344 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
345 }
346
347 pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
349 self.bytes
350 .try_into_mut()
351 .map(|bytes| BufferMut {
352 bytes,
353 length: self.length,
354 alignment: self.alignment,
355 _marker: Default::default(),
356 })
357 .map_err(|bytes| Self {
358 bytes,
359 length: self.length,
360 alignment: self.alignment,
361 _marker: Default::default(),
362 })
363 }
364
365 pub fn is_aligned(&self, alignment: Alignment) -> bool {
367 self.bytes.as_ptr().align_offset(*alignment) == 0
368 }
369
370 pub fn aligned(mut self, alignment: Alignment) -> Self {
372 if self.as_ptr().align_offset(*alignment) == 0 {
373 self.alignment = alignment;
374 self
375 } else {
376 #[cfg(feature = "warn-copy")]
377 {
378 let bt = std::backtrace::Backtrace::capture();
379 log::warn!(
380 "Buffer is not aligned to requested alignment {alignment}, copying: {bt}"
381 )
382 }
383 Self::copy_from_aligned(self, alignment)
384 }
385 }
386
387 pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
389 if self.as_ptr().align_offset(*alignment) == 0 {
390 self.alignment = alignment;
391 self
392 } else {
393 vortex_panic!("Buffer is not aligned to requested alignment {}", alignment)
394 }
395 }
396}
397
398impl<T: Debug> Debug for Buffer<T> {
399 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
400 f.debug_struct(&format!("Buffer<{}>", type_name::<T>()))
401 .field("length", &self.length)
402 .field("alignment", &self.alignment)
403 .field("as_slice", &TruncatedDebug(self.as_slice()))
404 .finish()
405 }
406}
407
408impl<T> Deref for Buffer<T> {
409 type Target = [T];
410
411 fn deref(&self) -> &Self::Target {
412 self.as_slice()
413 }
414}
415
416impl<T> AsRef<[T]> for Buffer<T> {
417 fn as_ref(&self) -> &[T] {
418 self.as_slice()
419 }
420}
421
422impl<T> FromIterator<T> for Buffer<T> {
423 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
424 BufferMut::from_iter(iter).freeze()
425 }
426}
427
428impl From<Vec<u8>> for ByteBuffer {
430 fn from(value: Vec<u8>) -> Self {
431 Self::from(Bytes::from(value))
432 }
433}
434
435impl From<Bytes> for ByteBuffer {
437 fn from(bytes: Bytes) -> Self {
438 let length = bytes.len();
439 Self {
440 bytes,
441 length,
442 alignment: Alignment::of::<u8>(),
443 _marker: Default::default(),
444 }
445 }
446}
447
448impl Buf for ByteBuffer {
449 fn remaining(&self) -> usize {
450 self.len()
451 }
452
453 fn chunk(&self) -> &[u8] {
454 self.as_slice()
455 }
456
457 fn advance(&mut self, cnt: usize) {
458 if !cnt.is_multiple_of(*self.alignment) {
459 vortex_panic!(
460 "Cannot advance buffer by {} items, resulting alignment is not {}",
461 cnt,
462 self.alignment
463 );
464 }
465 self.bytes.advance(cnt);
466 self.length -= cnt;
467 }
468}
469
470pub struct BufferIterator<T> {
472 buffer: Buffer<T>,
473 index: usize,
474}
475
476impl<T: Copy> Iterator for BufferIterator<T> {
477 type Item = T;
478
479 fn next(&mut self) -> Option<Self::Item> {
480 (self.index < self.buffer.len()).then(move || {
481 let value = self.buffer.as_slice()[self.index];
482 self.index += 1;
483 value
484 })
485 }
486
487 fn size_hint(&self) -> (usize, Option<usize>) {
488 let remaining = self.buffer.len() - self.index;
489 (remaining, Some(remaining))
490 }
491}
492
493impl<T: Copy> IntoIterator for Buffer<T> {
494 type Item = T;
495 type IntoIter = BufferIterator<T>;
496
497 fn into_iter(self) -> Self::IntoIter {
498 BufferIterator {
499 buffer: self,
500 index: 0,
501 }
502 }
503}
504
505impl<T> From<BufferMut<T>> for Buffer<T> {
506 fn from(value: BufferMut<T>) -> Self {
507 value.freeze()
508 }
509}
510
511#[cfg(test)]
512mod test {
513 use bytes::Buf;
514
515 use crate::{Alignment, ByteBuffer, buffer};
516
517 #[test]
518 fn align() {
519 let buf = buffer![0u8, 1, 2];
520 let aligned = buf.aligned(Alignment::new(32));
521 assert_eq!(aligned.alignment(), Alignment::new(32));
522 assert_eq!(aligned.as_slice(), &[0, 1, 2]);
523 }
524
525 #[test]
526 fn slice() {
527 let buf = buffer![0, 1, 2, 3, 4];
528 assert_eq!(buf.slice(1..3).as_slice(), &[1, 2]);
529 assert_eq!(buf.slice(1..=3).as_slice(), &[1, 2, 3]);
530 }
531
532 #[test]
533 fn slice_unaligned() {
534 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
535 buf.slice_unaligned(1..2);
537 }
538
539 #[test]
540 #[should_panic]
541 fn slice_bad_alignment() {
542 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
543 buf.slice(1..2);
545 }
546
547 #[test]
548 fn bytes_buf() {
549 let mut buf = ByteBuffer::copy_from("helloworld".as_bytes());
550 assert_eq!(buf.remaining(), 10);
551 assert_eq!(buf.chunk(), b"helloworld");
552
553 Buf::advance(&mut buf, 5);
554 assert_eq!(buf.remaining(), 5);
555 assert_eq!(buf.as_slice(), b"world");
556 assert_eq!(buf.chunk(), b"world");
557 }
558}