1use std::ops::{Deref, Range, RangeBounds};
2use std::sync::LazyLock;
3
4use bytemuck::{Pod, Zeroable};
5use either::Either;
6use polars_utils::range::{check_range, decode_range_unchecked};
7
8use crate::storage::SharedStorage;
9
10pub struct Buffer<T> {
40 storage: SharedStorage<T>,
42
43 ptr: *const T,
45
46 length: usize,
48}
49
50impl<T> Clone for Buffer<T> {
51 fn clone(&self) -> Self {
52 Self {
53 storage: self.storage.clone(),
54 ptr: self.ptr,
55 length: self.length,
56 }
57 }
58}
59
60unsafe impl<T: Send + Sync> Sync for Buffer<T> {}
61unsafe impl<T: Send + Sync> Send for Buffer<T> {}
62
63impl<T: PartialEq> PartialEq for Buffer<T> {
64 #[inline]
65 fn eq(&self, other: &Self) -> bool {
66 self.deref() == other.deref()
67 }
68}
69
70impl<T: Eq> Eq for Buffer<T> {}
71
72impl<T: std::hash::Hash> std::hash::Hash for Buffer<T> {
73 #[inline]
74 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
75 self.as_slice().hash(state);
76 }
77}
78
79impl<T: std::fmt::Debug> std::fmt::Debug for Buffer<T> {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 std::fmt::Debug::fmt(&**self, f)
82 }
83}
84
85impl<T> Default for Buffer<T> {
86 #[inline]
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl<T> Buffer<T> {
93 #[inline]
95 pub const fn new() -> Self {
96 Self::from_storage(SharedStorage::empty())
97 }
98
99 pub const fn from_storage(storage: SharedStorage<T>) -> Self {
101 let ptr = storage.as_ptr();
102 let length = storage.len();
103 Buffer {
104 storage,
105 ptr,
106 length,
107 }
108 }
109
110 pub fn from_static(data: &'static [T]) -> Self {
112 Self::from_storage(SharedStorage::from_static(data))
113 }
114
115 pub fn from_vec(data: Vec<T>) -> Self {
117 Self::from_storage(SharedStorage::from_vec(data))
118 }
119
120 pub fn from_owner<O: Send + AsRef<[T]> + 'static>(owner: O) -> Self {
122 Self::from_storage(SharedStorage::from_owner(owner))
123 }
124
125 pub fn with_slice<R, F: FnOnce(Buffer<T>) -> R>(slice: &[T], f: F) -> R {
129 SharedStorage::with_slice(slice, |ss| f(Self::from_storage(ss)))
130 }
131
132 pub fn with_vec<R, F: FnOnce(Buffer<T>) -> R>(vec: &mut Vec<T>, f: F) -> R {
137 SharedStorage::with_vec(vec, |ss| f(Self::from_storage(ss)))
138 }
139
140 pub fn into_storage(self) -> SharedStorage<T> {
142 self.storage
143 }
144
145 #[inline]
147 pub fn len(&self) -> usize {
148 self.length
149 }
150
151 #[inline]
153 pub fn is_empty(&self) -> bool {
154 self.length == 0
155 }
156
157 pub fn is_sliced(&self) -> bool {
161 self.storage.len() != self.length
162 }
163
164 pub fn expand_end_to_storage(self) -> Self {
168 unsafe {
169 let offset = self.ptr.offset_from(self.storage.as_ptr()) as usize;
170 Self {
171 ptr: self.ptr,
172 length: self.storage.len() - offset,
173 storage: self.storage,
174 }
175 }
176 }
177
178 #[inline]
180 pub fn as_slice(&self) -> &[T] {
181 debug_assert!(self.offset() + self.length <= self.storage.len());
183 unsafe { std::slice::from_raw_parts(self.ptr, self.length) }
184 }
185
186 #[inline]
192 #[must_use]
193 pub fn sliced<R: RangeBounds<usize>>(mut self, range: R) -> Self {
194 self.slice_in_place(range);
195 self
196 }
197
198 #[inline]
204 #[must_use]
205 pub unsafe fn sliced_unchecked<R: RangeBounds<usize>>(mut self, range: R) -> Self {
206 unsafe {
207 self.slice_in_place_unchecked(range);
208 }
209 self
210 }
211
212 #[inline]
217 pub fn slice_in_place<R: RangeBounds<usize>>(&mut self, range: R) {
218 unsafe {
219 let Range { start, end } = check_range(range, ..self.len());
220 self.ptr = self.ptr.add(start);
221 self.length = end - start;
222 }
223 }
224
225 #[inline]
230 pub unsafe fn slice_in_place_unchecked<R: RangeBounds<usize>>(&mut self, range: R) {
231 unsafe {
232 let Range { start, end } = decode_range_unchecked(range, ..self.len());
233 self.ptr = self.ptr.add(start);
234 self.length = end - start;
235 }
236 }
237
238 #[must_use]
247 pub fn split_at(self, mid: usize) -> (Self, Self) {
248 (self.clone().sliced(..mid), self.sliced(mid..))
249 }
250
251 #[must_use]
260 pub fn split_off(&mut self, at: usize) -> Self {
261 let out = self.clone().sliced(at..);
262 self.slice_in_place(..at);
263 out
264 }
265
266 #[inline]
268 pub fn storage_ptr(&self) -> *const T {
269 self.storage.as_ptr()
270 }
271
272 #[inline]
274 pub fn offset(&self) -> usize {
275 unsafe {
276 let ret = self.ptr.offset_from(self.storage.as_ptr()) as usize;
277 debug_assert!(ret <= self.storage.len());
278 ret
279 }
280 }
281
282 #[inline]
285 pub unsafe fn set_len(&mut self, len: usize) {
286 self.length = len;
287 }
288
289 #[inline]
295 pub fn into_mut(mut self) -> Either<Self, Vec<T>> {
296 if self.is_sliced() {
298 return Either::Left(self);
299 }
300 match self.storage.try_into_vec() {
301 Ok(v) => Either::Right(v),
302 Err(slf) => {
303 self.storage = slf;
304 Either::Left(self)
305 },
306 }
307 }
308
309 #[inline]
315 pub fn get_mut_slice(&mut self) -> Option<&mut [T]> {
316 let offset = self.offset();
317 let slice = self.storage.try_as_mut_slice()?;
318 Some(unsafe { slice.get_unchecked_mut(offset..offset + self.length) })
319 }
320
321 pub fn storage_refcount(&self) -> u64 {
324 self.storage.refcount()
325 }
326
327 pub fn is_same_buffer(&self, other: &Self) -> bool {
329 self.ptr == other.ptr && self.length == other.length
330 }
331}
332
333impl<T: Pod> Buffer<T> {
334 pub fn try_transmute<U: Pod>(mut self) -> Result<Buffer<U>, Self> {
335 assert_ne!(size_of::<U>(), 0);
336 let ptr = self.ptr as *const U;
337 let length = self.length;
338 match self.storage.try_transmute() {
339 Err(v) => {
340 self.storage = v;
341 Err(self)
342 },
343 Ok(storage) => Ok(Buffer {
344 storage,
345 ptr,
346 length: length.checked_mul(size_of::<T>()).expect("overflow") / size_of::<U>(),
347 }),
348 }
349 }
350}
351
352impl<T: Clone> Buffer<T> {
353 pub fn to_vec(self) -> Vec<T> {
354 match self.into_mut() {
355 Either::Right(v) => v,
356 Either::Left(same) => same.as_slice().to_vec(),
357 }
358 }
359}
360
361#[repr(C, align(4096))]
362#[derive(Copy, Clone)]
363struct Aligned([u8; 4096]);
364
365const GLOBAL_ZERO_SIZE: usize = 8 * 1024 * 1024;
368static GLOBAL_ZEROES: LazyLock<SharedStorage<Aligned>> = LazyLock::new(|| {
369 assert!(GLOBAL_ZERO_SIZE.is_multiple_of(size_of::<Aligned>()));
370 let chunks = GLOBAL_ZERO_SIZE / size_of::<Aligned>();
371 let v = vec![Aligned([0; _]); chunks];
372 let mut ss = SharedStorage::from_vec(v);
373 ss.leak();
374 ss
375});
376
377impl<T: Zeroable> Buffer<T> {
378 pub fn zeroed(length: usize) -> Self {
379 let bytes_needed = length * size_of::<T>();
380 if align_of::<T>() <= align_of::<Aligned>() && bytes_needed <= GLOBAL_ZERO_SIZE {
381 unsafe {
382 let storage = GLOBAL_ZEROES.clone().transmute_unchecked::<T>();
384 let ptr = storage.as_ptr();
385 Buffer {
386 storage,
387 ptr,
388 length,
389 }
390 }
391 } else {
392 bytemuck::zeroed_vec(length).into()
393 }
394 }
395}
396
397impl<T> From<Vec<T>> for Buffer<T> {
398 #[inline]
399 fn from(v: Vec<T>) -> Self {
400 Self::from_vec(v)
401 }
402}
403
404impl<T> Deref for Buffer<T> {
405 type Target = [T];
406
407 #[inline(always)]
408 fn deref(&self) -> &[T] {
409 self.as_slice()
410 }
411}
412
413impl<T> AsRef<[T]> for Buffer<T> {
414 #[inline(always)]
415 fn as_ref(&self) -> &[T] {
416 self.as_slice()
417 }
418}
419
420impl<T> FromIterator<T> for Buffer<T> {
421 #[inline]
422 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
423 Vec::from_iter(iter).into()
424 }
425}
426
427#[cfg(feature = "serde")]
428mod _serde_impl {
429 use serde::{Deserialize, Serialize};
430
431 use super::Buffer;
432
433 impl<T> Serialize for Buffer<T>
434 where
435 T: Serialize,
436 {
437 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
438 where
439 S: serde::Serializer,
440 {
441 <[T] as Serialize>::serialize(self.as_slice(), serializer)
442 }
443 }
444
445 impl<'de, T> Deserialize<'de> for Buffer<T>
446 where
447 T: Deserialize<'de>,
448 {
449 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
450 where
451 D: serde::Deserializer<'de>,
452 {
453 <Vec<T> as Deserialize>::deserialize(deserializer).map(Buffer::from)
454 }
455 }
456}
457
458impl<T: Copy> IntoIterator for Buffer<T> {
459 type Item = T;
460
461 type IntoIter = IntoIter<T>;
462
463 fn into_iter(self) -> Self::IntoIter {
464 IntoIter::new(self)
465 }
466}
467
468#[derive(Debug, Clone)]
470pub struct IntoIter<T: Copy> {
471 values: Buffer<T>,
472 index: usize,
473 end: usize,
474}
475
476impl<T: Copy> IntoIter<T> {
477 #[inline]
478 fn new(values: Buffer<T>) -> Self {
479 let end = values.len();
480 Self {
481 values,
482 index: 0,
483 end,
484 }
485 }
486}
487
488impl<T: Copy> Iterator for IntoIter<T> {
489 type Item = T;
490
491 #[inline]
492 fn next(&mut self) -> Option<Self::Item> {
493 if self.index == self.end {
494 return None;
495 }
496 let old = self.index;
497 self.index += 1;
498 Some(*unsafe { self.values.get_unchecked(old) })
499 }
500
501 #[inline]
502 fn size_hint(&self) -> (usize, Option<usize>) {
503 (self.end - self.index, Some(self.end - self.index))
504 }
505
506 #[inline]
507 fn nth(&mut self, n: usize) -> Option<Self::Item> {
508 let new_index = self.index + n;
509 if new_index > self.end {
510 self.index = self.end;
511 None
512 } else {
513 self.index = new_index;
514 self.next()
515 }
516 }
517}
518
519impl<T: Copy> DoubleEndedIterator for IntoIter<T> {
520 #[inline]
521 fn next_back(&mut self) -> Option<Self::Item> {
522 if self.index == self.end {
523 None
524 } else {
525 self.end -= 1;
526 Some(*unsafe { self.values.get_unchecked(self.end) })
527 }
528 }
529}
530
531impl<T: Copy> ExactSizeIterator for IntoIter<T> {}