1use std::any::type_name;
5use std::cmp::Ordering;
6use std::collections::Bound;
7use std::fmt::{Debug, Formatter};
8use std::hash::{Hash, Hasher};
9use std::ops::{Deref, RangeBounds};
10
11use bytes::{Buf, Bytes};
12use vortex_error::{VortexExpect, vortex_panic};
13
14use crate::debug::TruncatedDebug;
15use crate::trusted_len::TrustedLen;
16use crate::{Alignment, BufferMut, ByteBuffer};
17
18#[derive(Clone)]
20pub struct Buffer<T> {
21 pub(crate) bytes: Bytes,
22 pub(crate) length: usize,
23 pub(crate) alignment: Alignment,
24 pub(crate) _marker: std::marker::PhantomData<T>,
25}
26
27impl<T> PartialEq for Buffer<T> {
28 fn eq(&self, other: &Self) -> bool {
29 self.bytes == other.bytes
30 }
31}
32
33impl<T> Eq for Buffer<T> {}
34
35impl<T> Ord for Buffer<T> {
36 fn cmp(&self, other: &Self) -> Ordering {
37 self.bytes.cmp(&other.bytes)
38 }
39}
40
41impl<T> PartialOrd for Buffer<T> {
42 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
43 Some(self.cmp(other))
44 }
45}
46
47impl<T> Hash for Buffer<T> {
48 fn hash<H: Hasher>(&self, state: &mut H) {
49 self.bytes.as_ref().hash(state)
50 }
51}
52
53impl<T> Buffer<T> {
54 pub fn copy_from(values: impl AsRef<[T]>) -> Self {
61 BufferMut::copy_from(values).freeze()
62 }
63
64 pub fn copy_from_aligned(values: impl AsRef<[T]>, alignment: Alignment) -> Self {
66 BufferMut::copy_from_aligned(values, alignment).freeze()
67 }
68
69 pub fn zeroed(len: usize) -> Self {
71 Self::zeroed_aligned(len, Alignment::of::<T>())
72 }
73
74 pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
76 BufferMut::zeroed_aligned(len, alignment).freeze()
77 }
78
79 pub fn empty() -> Self {
81 BufferMut::empty().freeze()
82 }
83
84 pub fn empty_aligned(alignment: Alignment) -> Self {
86 BufferMut::empty_aligned(alignment).freeze()
87 }
88
89 pub fn full(item: T, len: usize) -> Self
91 where
92 T: Copy,
93 {
94 BufferMut::full(item, len).freeze()
95 }
96
97 pub fn from_byte_buffer(buffer: ByteBuffer) -> Self {
104 Self::from_byte_buffer_aligned(buffer, Alignment::of::<T>())
106 }
107
108 pub fn from_byte_buffer_aligned(buffer: ByteBuffer, alignment: Alignment) -> Self {
115 Self::from_bytes_aligned(buffer.into_inner(), alignment)
116 }
117
118 pub fn from_bytes_aligned(bytes: Bytes, alignment: Alignment) -> Self {
125 if !alignment.is_aligned_to(Alignment::of::<T>()) {
126 vortex_panic!(
127 "Alignment {} must be compatible with the scalar type's alignment {}",
128 alignment,
129 Alignment::of::<T>(),
130 );
131 }
132 if bytes.as_ptr().align_offset(*alignment) != 0 {
133 vortex_panic!(
134 "Bytes alignment must align to the requested alignment {}",
135 alignment,
136 );
137 }
138 if bytes.len() % size_of::<T>() != 0 {
139 vortex_panic!(
140 "Bytes length {} must be a multiple of the scalar type's size {}",
141 bytes.len(),
142 size_of::<T>()
143 );
144 }
145 let length = bytes.len() / size_of::<T>();
146 Self {
147 bytes,
148 length,
149 alignment,
150 _marker: Default::default(),
151 }
152 }
153
154 pub fn from_trusted_len_iter<I: TrustedLen<Item = T>>(iter: I) -> Self {
157 let (_, high) = iter.size_hint();
158 let mut buffer =
159 BufferMut::with_capacity(high.vortex_expect("TrustedLen iterator has no upper bound"));
160 buffer.extend_trusted(iter);
161 buffer.freeze()
162 }
163
164 #[inline(always)]
166 pub fn len(&self) -> usize {
167 self.length
168 }
169
170 #[inline(always)]
172 pub fn is_empty(&self) -> bool {
173 self.length == 0
174 }
175
176 #[inline(always)]
178 pub fn alignment(&self) -> Alignment {
179 self.alignment
180 }
181
182 #[inline(always)]
184 pub fn as_slice(&self) -> &[T] {
185 let raw_slice = self.bytes.as_ref();
186 unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
188 }
189
190 pub fn iter(&self) -> Iter<'_, T> {
192 Iter {
193 inner: self.as_slice().iter(),
194 }
195 }
196
197 #[inline(always)]
204 pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
205 self.slice_with_alignment(range, self.alignment)
206 }
207
208 #[inline(always)]
215 pub fn slice_unaligned(&self, range: impl RangeBounds<usize>) -> Self {
216 self.slice_with_alignment(range, Alignment::of::<u8>())
217 }
218
219 pub fn slice_with_alignment(
227 &self,
228 range: impl RangeBounds<usize>,
229 alignment: Alignment,
230 ) -> Self {
231 let len = self.len();
232 let begin = match range.start_bound() {
233 Bound::Included(&n) => n,
234 Bound::Excluded(&n) => n.checked_add(1).vortex_expect("out of range"),
235 Bound::Unbounded => 0,
236 };
237 let end = match range.end_bound() {
238 Bound::Included(&n) => n.checked_add(1).vortex_expect("out of range"),
239 Bound::Excluded(&n) => n,
240 Bound::Unbounded => len,
241 };
242
243 if begin > end {
244 vortex_panic!(
245 "range start must not be greater than end: {:?} <= {:?}",
246 begin,
247 end
248 );
249 }
250 if end > len {
251 vortex_panic!("range end out of bounds: {:?} <= {:?}", end, len);
252 }
253
254 if end == begin {
255 return Self::empty_aligned(alignment);
258 }
259
260 let begin_byte = begin * size_of::<T>();
261 let end_byte = end * size_of::<T>();
262
263 if !begin_byte.is_multiple_of(*alignment) {
264 vortex_panic!("range start must be aligned to {:?}", alignment);
265 }
266 if !end_byte.is_multiple_of(*alignment) {
267 vortex_panic!("range end must be aligned to {:?}", alignment);
268 }
269 if !alignment.is_aligned_to(Alignment::of::<T>()) {
270 vortex_panic!("Slice alignment must at least align to type T")
271 }
272
273 Self {
274 bytes: self.bytes.slice(begin_byte..end_byte),
275 length: end - begin,
276 alignment,
277 _marker: Default::default(),
278 }
279 }
280
281 #[inline(always)]
290 pub fn slice_ref(&self, subset: &[T]) -> Self {
291 self.slice_ref_with_alignment(subset, Alignment::of::<T>())
292 }
293
294 pub fn slice_ref_with_alignment(&self, subset: &[T], alignment: Alignment) -> Self {
304 if !alignment.is_aligned_to(Alignment::of::<T>()) {
305 vortex_panic!("slice_ref alignment must at least align to type T")
306 }
307
308 if !self.alignment.is_aligned_to(alignment) {
309 vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
310 }
311
312 if subset.as_ptr().align_offset(*alignment) != 0 {
313 vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
314 }
315
316 let subset_u8 =
317 unsafe { std::slice::from_raw_parts(subset.as_ptr().cast(), size_of_val(subset)) };
318
319 Self {
320 bytes: self.bytes.slice_ref(subset_u8),
321 length: subset.len(),
322 alignment,
323 _marker: Default::default(),
324 }
325 }
326
327 pub fn inner(&self) -> &Bytes {
329 debug_assert_eq!(
330 self.length * size_of::<T>(),
331 self.bytes.len(),
332 "Own length has to be the same as the underlying bytes length"
333 );
334 &self.bytes
335 }
336
337 pub fn into_inner(self) -> Bytes {
339 debug_assert_eq!(
340 self.length * size_of::<T>(),
341 self.bytes.len(),
342 "Own length has to be the same as the underlying bytes length"
343 );
344 self.bytes
345 }
346
347 pub fn into_byte_buffer(self) -> ByteBuffer {
349 ByteBuffer {
350 bytes: self.bytes,
351 length: self.length * size_of::<T>(),
352 alignment: self.alignment,
353 _marker: Default::default(),
354 }
355 }
356
357 pub fn into_mut(self) -> BufferMut<T> {
359 self.try_into_mut()
360 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
361 }
362
363 pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
365 self.bytes
366 .try_into_mut()
367 .map(|bytes| BufferMut {
368 bytes,
369 length: self.length,
370 alignment: self.alignment,
371 _marker: Default::default(),
372 })
373 .map_err(|bytes| Self {
374 bytes,
375 length: self.length,
376 alignment: self.alignment,
377 _marker: Default::default(),
378 })
379 }
380
381 pub fn is_aligned(&self, alignment: Alignment) -> bool {
383 self.bytes.as_ptr().align_offset(*alignment) == 0
384 }
385
386 pub fn aligned(mut self, alignment: Alignment) -> Self {
388 if self.as_ptr().align_offset(*alignment) == 0 {
389 self.alignment = alignment;
390 self
391 } else {
392 #[cfg(feature = "warn-copy")]
393 {
394 let bt = std::backtrace::Backtrace::capture();
395 log::warn!(
396 "Buffer is not aligned to requested alignment {alignment}, copying: {bt}"
397 )
398 }
399 Self::copy_from_aligned(self, alignment)
400 }
401 }
402
403 pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
405 if self.as_ptr().align_offset(*alignment) == 0 {
406 self.alignment = alignment;
407 self
408 } else {
409 vortex_panic!("Buffer is not aligned to requested alignment {}", alignment)
410 }
411 }
412}
413
414pub struct Iter<'a, T> {
418 inner: std::slice::Iter<'a, T>,
419}
420
421impl<'a, T> Iterator for Iter<'a, T> {
422 type Item = &'a T;
423
424 fn next(&mut self) -> Option<Self::Item> {
425 self.inner.next()
426 }
427
428 fn size_hint(&self) -> (usize, Option<usize>) {
429 self.inner.size_hint()
430 }
431
432 fn count(self) -> usize {
433 self.inner.count()
434 }
435
436 fn last(self) -> Option<Self::Item> {
437 self.inner.last()
438 }
439
440 fn nth(&mut self, n: usize) -> Option<Self::Item> {
441 self.inner.nth(n)
442 }
443}
444
445impl<T> ExactSizeIterator for Iter<'_, T> {
446 fn len(&self) -> usize {
447 self.inner.len()
448 }
449}
450
451impl<T: Debug> Debug for Buffer<T> {
452 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
453 f.debug_struct(&format!("Buffer<{}>", type_name::<T>()))
454 .field("length", &self.length)
455 .field("alignment", &self.alignment)
456 .field("as_slice", &TruncatedDebug(self.as_slice()))
457 .finish()
458 }
459}
460
461impl<T> Deref for Buffer<T> {
462 type Target = [T];
463
464 fn deref(&self) -> &Self::Target {
465 self.as_slice()
466 }
467}
468
469impl<T> AsRef<[T]> for Buffer<T> {
470 fn as_ref(&self) -> &[T] {
471 self.as_slice()
472 }
473}
474
475impl<T> FromIterator<T> for Buffer<T> {
476 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
477 BufferMut::from_iter(iter).freeze()
478 }
479}
480
481impl From<Vec<u8>> for ByteBuffer {
483 fn from(value: Vec<u8>) -> Self {
484 Self::from(Bytes::from(value))
485 }
486}
487
488impl From<Bytes> for ByteBuffer {
490 fn from(bytes: Bytes) -> Self {
491 let length = bytes.len();
492 Self {
493 bytes,
494 length,
495 alignment: Alignment::of::<u8>(),
496 _marker: Default::default(),
497 }
498 }
499}
500
501impl Buf for ByteBuffer {
502 fn remaining(&self) -> usize {
503 self.len()
504 }
505
506 fn chunk(&self) -> &[u8] {
507 self.as_slice()
508 }
509
510 fn advance(&mut self, cnt: usize) {
511 if !cnt.is_multiple_of(*self.alignment) {
512 vortex_panic!(
513 "Cannot advance buffer by {} items, resulting alignment is not {}",
514 cnt,
515 self.alignment
516 );
517 }
518 self.bytes.advance(cnt);
519 self.length -= cnt;
520 }
521}
522
523pub struct BufferIterator<T> {
525 buffer: Buffer<T>,
526 index: usize,
527}
528
529impl<T: Copy> Iterator for BufferIterator<T> {
530 type Item = T;
531
532 fn next(&mut self) -> Option<Self::Item> {
533 (self.index < self.buffer.len()).then(move || {
534 let value = self.buffer[self.index];
535 self.index += 1;
536 value
537 })
538 }
539
540 fn size_hint(&self) -> (usize, Option<usize>) {
541 let remaining = self.buffer.len() - self.index;
542 (remaining, Some(remaining))
543 }
544}
545
546impl<T: Copy> IntoIterator for Buffer<T> {
547 type Item = T;
548 type IntoIter = BufferIterator<T>;
549
550 fn into_iter(self) -> Self::IntoIter {
551 BufferIterator {
552 buffer: self,
553 index: 0,
554 }
555 }
556}
557
558impl<T> From<BufferMut<T>> for Buffer<T> {
559 fn from(value: BufferMut<T>) -> Self {
560 value.freeze()
561 }
562}
563
564#[cfg(test)]
565mod test {
566 use bytes::Buf;
567
568 use crate::{Alignment, ByteBuffer, buffer};
569
570 #[test]
571 fn align() {
572 let buf = buffer![0u8, 1, 2];
573 let aligned = buf.aligned(Alignment::new(32));
574 assert_eq!(aligned.alignment(), Alignment::new(32));
575 assert_eq!(aligned.as_slice(), &[0, 1, 2]);
576 }
577
578 #[test]
579 fn slice() {
580 let buf = buffer![0, 1, 2, 3, 4];
581 assert_eq!(buf.slice(1..3).as_slice(), &[1, 2]);
582 assert_eq!(buf.slice(1..=3).as_slice(), &[1, 2, 3]);
583 }
584
585 #[test]
586 fn slice_unaligned() {
587 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
588 buf.slice_unaligned(1..2);
590 }
591
592 #[test]
593 #[should_panic]
594 fn slice_bad_alignment() {
595 let buf = buffer![0i32, 1, 2, 3, 4].into_byte_buffer();
596 buf.slice(1..2);
598 }
599
600 #[test]
601 fn bytes_buf() {
602 let mut buf = ByteBuffer::copy_from("helloworld".as_bytes());
603 assert_eq!(buf.remaining(), 10);
604 assert_eq!(buf.chunk(), b"helloworld");
605
606 Buf::advance(&mut buf, 5);
607 assert_eq!(buf.remaining(), 5);
608 assert_eq!(buf.as_slice(), b"world");
609 assert_eq!(buf.chunk(), b"world");
610 }
611}