use core::alloc::{Allocator, Layout};
use std::ptr::NonNull;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering};
use crate::{Bitmask, Buffer, Vec64Alloc};
pub struct LBuffer<T, const MASK: bool = false> {
inner: Arc<LBufferInner<T>>,
mask: Option<Arc<LBufferInner<u8>>>,
}
struct LBufferInner<T> {
base: NonNull<T>,
capacity: usize,
len: AtomicUsize,
sealed: AtomicBool,
mask_tail: Option<LMaskTail>,
}
pub(crate) struct LMaskTail {
cell: AtomicU64,
}
impl LMaskTail {
const COUNT_MASK: u64 = 0xFF;
const INDEX_SHIFT: u64 = 8;
#[inline]
fn new() -> Self {
Self {
cell: AtomicU64::new(0),
}
}
#[inline]
fn pack(index: usize, count: usize) -> u64 {
((index as u64) << Self::INDEX_SHIFT) | count as u64
}
#[inline]
pub(crate) fn load(&self) -> (usize, usize) {
let v = self.cell.load(Ordering::Acquire);
((v >> Self::INDEX_SHIFT) as usize, (v & Self::COUNT_MASK) as usize)
}
#[inline]
fn load_relaxed(&self) -> (usize, usize) {
let v = self.cell.load(Ordering::Relaxed);
((v >> Self::INDEX_SHIFT) as usize, (v & Self::COUNT_MASK) as usize)
}
#[inline]
fn publish(&self, index: usize, count: usize) {
self.cell.store(Self::pack(index, count), Ordering::Release);
}
}
unsafe impl<T: Send + Sync> Send for LBufferInner<T> {}
unsafe impl<T: Send + Sync> Sync for LBufferInner<T> {}
impl<T> LBuffer<T, false> {
pub fn with_capacity(capacity: usize) -> Self {
let layout = Layout::array::<T>(capacity).expect("LBuffer layout overflow");
let alloc = Vec64Alloc::default();
let raw = alloc.allocate(layout).expect("LBuffer allocation failed");
let base = raw.cast::<T>();
Self {
inner: Arc::new(LBufferInner {
base,
capacity,
len: AtomicUsize::new(0),
sealed: AtomicBool::new(false),
mask_tail: None,
}),
mask: None,
}
}
pub fn with_capacity_masked(capacity: usize) -> LBuffer<T, true> {
let alloc = Vec64Alloc::default();
let vlayout = Layout::array::<T>(capacity).expect("LBuffer layout overflow");
let base = alloc
.allocate(vlayout)
.expect("LBuffer allocation failed")
.cast::<T>();
let inner = Arc::new(LBufferInner {
base,
capacity,
len: AtomicUsize::new(0),
sealed: AtomicBool::new(false),
mask_tail: None,
});
let n_bytes = (capacity + 7) / 8;
let mlayout = Layout::array::<u8>(n_bytes).expect("LBuffer mask layout overflow");
let mbase = alloc
.allocate(mlayout)
.expect("LBuffer mask allocation failed")
.cast::<u8>();
let mask = Arc::new(LBufferInner {
base: mbase,
capacity: n_bytes,
len: AtomicUsize::new(0),
sealed: AtomicBool::new(false),
mask_tail: Some(LMaskTail::new()),
});
LBuffer {
inner,
mask: Some(mask),
}
}
}
impl<T, const MASK: bool> LBuffer<T, MASK> {
pub fn push(&mut self, value: T) -> Result<(), T> {
if self.inner.sealed.load(Ordering::Acquire) {
return Err(value);
}
let n = self.inner.len.load(Ordering::Relaxed);
if n == self.inner.capacity {
return Err(value);
}
unsafe {
self.inner.base.as_ptr().add(n).write(value);
}
if MASK {
self.advance_mask(true);
}
self.inner.len.store(n + 1, Ordering::Release);
Ok(())
}
pub fn push_null(&mut self) -> Result<(), ()>
where
T: Default,
{
if !MASK || self.inner.sealed.load(Ordering::Acquire) {
return Err(());
}
let n = self.inner.len.load(Ordering::Relaxed);
if n == self.inner.capacity {
return Err(());
}
unsafe {
self.inner.base.as_ptr().add(n).write(T::default());
}
self.advance_mask(false);
self.inner.len.store(n + 1, Ordering::Release);
Ok(())
}
pub fn push_nulls(&mut self, count: usize) -> Result<(), ()>
where
T: Default,
{
if !MASK || self.inner.sealed.load(Ordering::Acquire) {
return Err(());
}
if count == 0 {
return Ok(());
}
let n = self.inner.len.load(Ordering::Relaxed);
match n.checked_add(count) {
Some(end) if end <= self.inner.capacity => {}
_ => return Err(()),
}
for i in 0..count {
unsafe {
self.inner.base.as_ptr().add(n + i).write(T::default());
}
self.advance_mask(false);
}
self.inner.len.store(n + count, Ordering::Release);
Ok(())
}
#[inline]
fn advance_mask(&self, valid: bool) {
let mask = self
.mask
.as_ref()
.expect("masked buffer carries a validity cell");
let tail = mask
.mask_tail
.as_ref()
.expect("validity buffer carries a tail");
let (index, count) = tail.load_relaxed();
let byte = unsafe { AtomicU8::from_ptr(mask.base.as_ptr().add(index)) };
if count == 0 {
byte.store(0xFF, Ordering::Relaxed);
}
if !valid {
byte.fetch_and(!(1u8 << count), Ordering::Relaxed);
}
if count + 1 == 8 {
tail.publish(index + 1, 0);
} else {
tail.publish(index, count + 1);
}
}
pub fn len(&self) -> usize {
self.inner.len.load(Ordering::Acquire)
}
pub fn capacity(&self) -> usize {
self.inner.capacity
}
pub fn is_full(&self) -> bool {
self.len() == self.inner.capacity
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn view(&self) -> LBufferV<T> {
LBufferV {
inner: Arc::clone(&self.inner),
}
}
pub fn as_buffer(&self) -> Buffer<T> {
Buffer::from_lbuffer(self.view())
}
pub fn seal(&mut self) {
self.inner.sealed.store(true, Ordering::Release);
if MASK {
if let Some(mask) = self.mask.as_ref() {
self.finalise_mask_byte(mask);
mask.sealed.store(true, Ordering::Release);
}
}
}
#[inline]
fn finalise_mask_byte(&self, mask: &Arc<LBufferInner<u8>>) {
let tail = mask
.mask_tail
.as_ref()
.expect("validity buffer carries a tail");
let (index, count) = tail.load_relaxed();
if count > 0 && count < 8 {
let byte = unsafe { AtomicU8::from_ptr(mask.base.as_ptr().add(index)) };
let keep = ((1u16 << count) - 1) as u8;
byte.fetch_and(keep, Ordering::Relaxed);
}
}
pub fn is_sealed(&self) -> bool {
self.inner.sealed.load(Ordering::Acquire)
}
pub fn push_slice(&mut self, values: &[T]) -> Result<(), ()>
where
T: Copy,
{
if self.inner.sealed.load(Ordering::Acquire) {
return Err(());
}
let take = values.len();
if take == 0 {
return Ok(());
}
let n = self.inner.len.load(Ordering::Relaxed);
if n.saturating_add(take) > self.inner.capacity {
return Err(());
}
unsafe {
let dst = self.inner.base.as_ptr().add(n);
std::ptr::copy_nonoverlapping(values.as_ptr(), dst, take);
}
if MASK {
for _ in 0..take {
self.advance_mask(true);
}
}
self.inner.len.store(n + take, Ordering::Release);
Ok(())
}
pub unsafe fn modify_at_unchecked<F: FnOnce(&mut T)>(&mut self, idx: usize, f: F) {
unsafe { f(&mut *self.inner.base.as_ptr().add(idx)) }
}
}
impl<T> LBuffer<T, true> {
pub fn as_bitmask(&self) -> Bitmask {
let mask = self
.mask
.as_ref()
.expect("masked buffer carries a validity cell");
Bitmask::from_lbuffer(LBufferV {
inner: Arc::clone(mask),
})
}
}
impl<T, const MASK: bool> Drop for LBuffer<T, MASK> {
fn drop(&mut self) {
self.inner.sealed.store(true, Ordering::Release);
if MASK {
if let Some(mask) = self.mask.as_ref() {
self.finalise_mask_byte(mask);
mask.sealed.store(true, Ordering::Release);
}
}
}
}
impl<T> Drop for LBufferInner<T> {
fn drop(&mut self) {
let n = *self.len.get_mut();
for i in 0..n {
unsafe { self.base.as_ptr().add(i).drop_in_place() };
}
let layout = Layout::array::<T>(self.capacity).expect("LBufferInner drop layout");
let alloc = Vec64Alloc::default();
unsafe {
alloc.deallocate(self.base.cast::<u8>(), layout);
}
}
}
pub struct LBufferV<T> {
inner: Arc<LBufferInner<T>>,
}
impl<T> LBufferV<T> {
pub fn len(&self) -> usize {
self.inner.len.load(Ordering::Acquire)
}
pub(crate) fn mask_bits(&self) -> Option<usize> {
let (settled, filled) = self.inner.mask_tail.as_ref()?.load();
Some(settled * 8 + filled)
}
pub(crate) fn mask_state(&self) -> Option<(*const u8, usize, usize, bool)> {
let tail = self.inner.mask_tail.as_ref()?;
let (settled, filled) = tail.load();
let base = self.inner.base.as_ptr() as *const u8;
let sealed = self.inner.sealed.load(Ordering::Acquire);
Some((base, settled, filled, sealed))
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
self.inner.capacity
}
pub fn is_sealed(&self) -> bool {
self.inner.sealed.load(Ordering::Acquire)
}
pub fn get(&self, i: usize) -> Option<&T> {
let n = self.inner.len.load(Ordering::Acquire);
if i >= n {
return None;
}
unsafe { Some(&*self.inner.base.as_ptr().add(i)) }
}
pub unsafe fn get_unchecked(&self, i: usize) -> &T {
unsafe { &*self.inner.base.as_ptr().add(i) }
}
pub fn as_slice(&self) -> &[T] {
let n = self.inner.len.load(Ordering::Acquire);
unsafe { std::slice::from_raw_parts(self.inner.base.as_ptr(), n) }
}
}
impl<T> Clone for LBufferV<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T> AsRef<[T]> for LBufferV<T> {
#[inline]
fn as_ref(&self) -> &[T] {
self.as_slice()
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for LBufferV<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LBufferV")
.field("len", &self.len())
.field("capacity", &self.capacity())
.field("sealed", &self.is_sealed())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize as DropCount, Ordering};
use std::thread;
#[test]
fn push_and_view_basic() {
let mut buf = LBuffer::<u64>::with_capacity(1024);
for i in 0..100u64 {
buf.push(i).unwrap();
}
let v = buf.view();
assert_eq!(v.len(), 100);
for i in 0..100 {
assert_eq!(v.get(i).copied(), Some(i as u64));
}
assert_eq!(v.as_slice(), &(0u64..100).collect::<Vec<_>>()[..]);
}
#[test]
fn view_observes_the_latest_length() {
let mut buf = LBuffer::<u64>::with_capacity(1024);
let v = buf.view();
assert_eq!(v.len(), 0);
buf.push(1).unwrap();
assert_eq!(v.len(), 1);
buf.push(2).unwrap();
assert_eq!(v.len(), 2);
assert_eq!(v.as_slice(), &[1, 2]);
}
#[test]
fn views_outlive_writer() {
let v = {
let mut buf = LBuffer::<u64>::with_capacity(64);
for i in 0..10u64 {
buf.push(i).unwrap();
}
buf.view()
};
assert_eq!(v.len(), 10);
assert!(v.is_sealed());
assert_eq!(v.get(9).copied(), Some(9));
}
#[test]
fn get_unchecked_with_cached_bound() {
let mut buf = LBuffer::<u64>::with_capacity(1024);
for i in 0..50u64 {
buf.push(i).unwrap();
}
let v = buf.view();
let n = v.len();
let sum: u64 = (0..n).map(|i| unsafe { *v.get_unchecked(i) }).sum();
assert_eq!(sum, (0..50u64).sum::<u64>());
}
#[test]
fn push_returns_err_at_capacity() {
let mut buf = LBuffer::<u8>::with_capacity(4);
for i in 0u8..4 {
buf.push(i).unwrap();
}
assert!(matches!(buf.push(99), Err(99)));
assert_eq!(buf.len(), 4);
}
#[test]
fn many_readers_see_growing_length() {
let mut buf = LBuffer::<u64>::with_capacity(1 << 16);
let view = buf.view();
let stop = Arc::new(AtomicBool::new(false));
let stop_r = Arc::clone(&stop);
let reader = thread::spawn(move || {
let mut last_len = 0usize;
loop {
if stop_r.load(Ordering::Acquire) {
return last_len;
}
let n = view.len();
assert!(n >= last_len);
if n > 0 {
let want = (n - 1) as u64;
assert_eq!(view.get(n - 1).copied(), Some(want));
}
last_len = n;
}
});
for i in 0..50_000u64 {
buf.push(i).unwrap();
}
stop.store(true, Ordering::Release);
let observed = reader.join().unwrap();
assert!(observed <= 50_000);
}
#[test]
fn element_drops_run_when_inner_drops() {
#[derive(Debug)]
struct DropTracker(Arc<DropCount>);
impl Drop for DropTracker {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let counter = Arc::new(DropCount::new(0));
{
let mut buf = LBuffer::<DropTracker>::with_capacity(8);
for _ in 0..5 {
buf.push(DropTracker(Arc::clone(&counter))).unwrap();
}
}
assert_eq!(counter.load(Ordering::SeqCst), 5);
}
#[test]
fn unwritten_capacity_is_not_dropped() {
#[derive(Debug)]
struct DropTracker(Arc<DropCount>);
impl Drop for DropTracker {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let counter = Arc::new(DropCount::new(0));
{
let mut buf = LBuffer::<DropTracker>::with_capacity(64);
for _ in 0..3 {
buf.push(DropTracker(Arc::clone(&counter))).unwrap();
}
}
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[test]
fn new_buffer_is_not_sealed() {
let buf = LBuffer::<u8>::with_capacity(8);
assert!(!buf.is_sealed());
let v = buf.view();
assert!(!v.is_sealed());
}
#[test]
fn seal_blocks_subsequent_pushes() {
let mut buf = LBuffer::<u8>::with_capacity(8);
buf.push(1).unwrap();
buf.push(2).unwrap();
buf.seal();
assert!(buf.is_sealed());
assert!(matches!(buf.push(3), Err(3)));
assert_eq!(buf.len(), 2);
}
#[test]
fn view_observes_seal() {
let mut buf = LBuffer::<u8>::with_capacity(8);
let v = buf.view();
assert!(!v.is_sealed());
buf.seal();
assert!(v.is_sealed());
}
#[test]
fn sealed_view_still_reads_data_and_wraps_as_buffer() {
use crate::Buffer;
let mut buf = LBuffer::<i32>::with_capacity(8);
for i in 0..4i32 {
buf.push(i * 10).unwrap();
}
buf.seal();
let view = buf.view();
assert!(view.is_sealed());
assert_eq!(view.len(), 4);
assert_eq!(view.as_slice(), &[0, 10, 20, 30]);
let buffer: Buffer<i32> = Buffer::from_lbuffer(view);
assert_eq!(buffer.len(), 4);
assert_eq!(buffer.as_slice(), &[0, 10, 20, 30]);
}
#[test]
fn dropping_writer_seals_for_views() {
let v = {
let mut buf = LBuffer::<u32>::with_capacity(16);
for i in 0..5u32 {
buf.push(i).unwrap();
}
let v = buf.view();
assert!(!v.is_sealed());
v
};
assert!(v.is_sealed());
assert_eq!(v.len(), 5);
assert_eq!(v.as_slice(), &[0, 1, 2, 3, 4]);
}
#[test]
fn push_slice_appends_in_one_release() {
let mut buf = LBuffer::<u64>::with_capacity(16);
let v = buf.view();
buf.push_slice(&[1, 2, 3, 4, 5]).unwrap();
assert_eq!(buf.len(), 5);
assert_eq!(v.as_slice(), &[1u64, 2, 3, 4, 5]);
buf.push_slice(&[6, 7]).unwrap();
assert_eq!(v.as_slice(), &[1u64, 2, 3, 4, 5, 6, 7]);
}
#[test]
fn push_slice_empty_is_a_noop() {
let mut buf = LBuffer::<u32>::with_capacity(8);
buf.push(1).unwrap();
let len_before = buf.len();
buf.push_slice(&[]).unwrap();
assert_eq!(buf.len(), len_before);
}
#[test]
fn push_slice_at_capacity_returns_err_and_writes_nothing() {
let mut buf = LBuffer::<u8>::with_capacity(4);
buf.push_slice(&[1, 2, 3]).unwrap();
assert!(matches!(buf.push_slice(&[4, 5]), Err(())));
assert_eq!(buf.len(), 3);
assert_eq!(buf.view().as_slice(), &[1u8, 2, 3]);
buf.push_slice(&[4]).unwrap();
assert_eq!(buf.view().as_slice(), &[1u8, 2, 3, 4]);
}
#[test]
fn push_slice_after_seal_returns_err() {
let mut buf = LBuffer::<u16>::with_capacity(8);
buf.push(7).unwrap();
buf.seal();
assert!(matches!(buf.push_slice(&[1, 2]), Err(())));
assert_eq!(buf.len(), 1);
}
#[test]
fn modify_at_unchecked_mutates_in_place() {
let mut buf = LBuffer::<u8>::with_capacity(8);
buf.push_slice(&[0u8; 4]).unwrap();
unsafe {
buf.modify_at_unchecked(0, |b| *b |= 0b0000_0001);
buf.modify_at_unchecked(1, |b| *b |= 0b1000_0000);
}
let v = buf.view();
assert_eq!(v.as_slice(), &[0b0000_0001, 0b1000_0000, 0, 0]);
}
#[test]
fn allocation_freed_when_last_handle_drops() {
#[derive(Debug)]
struct DropTracker(Arc<DropCount>);
impl Drop for DropTracker {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let counter = Arc::new(DropCount::new(0));
let view = {
let mut buf = LBuffer::<DropTracker>::with_capacity(8);
for _ in 0..4 {
buf.push(DropTracker(Arc::clone(&counter))).unwrap();
}
buf.view()
};
assert_eq!(counter.load(Ordering::SeqCst), 0, "view keeps cell alive");
drop(view);
assert_eq!(counter.load(Ordering::SeqCst), 4);
}
#[test]
fn masked_push_and_null_read_via_bitmask() {
let mut buf = LBuffer::<f64>::with_capacity_masked(64);
buf.push(1.0).unwrap();
buf.push_null().unwrap();
buf.push(2.5).unwrap();
let mask = buf.as_bitmask();
assert_eq!(mask.len(), 3);
assert!(mask.get(0));
assert!(!mask.get(1));
assert!(mask.get(2));
assert_eq!(mask.count_zeros(), 1);
assert!(mask.has_nulls());
assert_eq!(buf.as_buffer().as_slice(), &[1.0, 0.0, 2.5]);
}
#[test]
fn masked_bitmask_tracks_writer() {
let mut buf = LBuffer::<i64>::with_capacity_masked(64);
let mask = buf.as_bitmask();
assert_eq!(mask.len(), 0);
buf.push(1).unwrap();
assert_eq!(mask.len(), 1);
assert!(mask.get(0));
buf.push_null().unwrap();
assert_eq!(mask.len(), 2);
assert!(!mask.get(1));
assert_eq!(mask.count_zeros(), 1);
}
#[test]
fn masked_crossover_settled_and_trailing() {
let mut buf = LBuffer::<i64>::with_capacity_masked(16);
for i in 0..12i64 {
if i == 3 || i == 7 || i == 10 {
buf.push_null().unwrap();
} else {
buf.push(i * 10).unwrap();
}
}
let mask = buf.as_bitmask();
assert_eq!(mask.len(), 12);
for i in 0..12 {
let is_null = i == 3 || i == 7 || i == 10;
assert_eq!(mask.get(i), !is_null, "bit {i}");
}
assert_eq!(mask.count_zeros(), 3);
}
#[test]
fn masked_push_nulls_bulk() {
let mut buf = LBuffer::<u32>::with_capacity_masked(32);
buf.push(5).unwrap();
buf.push_nulls(3).unwrap();
buf.push(9).unwrap();
let mask = buf.as_bitmask();
assert_eq!(mask.len(), 5);
assert!(mask.get(0));
assert!(!mask.get(1) && !mask.get(2) && !mask.get(3));
assert!(mask.get(4));
assert_eq!(mask.count_zeros(), 3);
}
#[test]
fn masked_all_valid_has_no_nulls() {
let mut buf = LBuffer::<i64>::with_capacity_masked(16);
for i in 0..10i64 {
buf.push(i).unwrap();
}
let mask = buf.as_bitmask();
assert_eq!(mask.len(), 10);
assert!(!mask.has_nulls());
assert_eq!(mask.count_zeros(), 0);
assert!((0..10).all(|i| mask.get(i)));
}
#[test]
fn push_null_on_unmasked_is_err() {
let mut buf = LBuffer::<i64>::with_capacity(8);
assert!(buf.push_null().is_err());
assert!(buf.push_nulls(2).is_err());
}
#[test]
fn masked_integration_with_float_array() {
use crate::{FloatArray, MaskedArray};
let mut price = LBuffer::<f64>::with_capacity_masked(64);
price.push(100.0).unwrap();
price.push_null().unwrap();
price.push(101.5).unwrap();
let arr = FloatArray::<f64> {
data: price.as_buffer(),
null_mask: Some(price.as_bitmask()),
};
assert_eq!(arr.len(), 3);
assert!(!arr.is_null(0));
assert!(arr.is_null(1));
assert!(!arr.is_null(2));
assert_eq!(arr.null_count(), 1);
}
#[test]
fn masked_as_slice_is_settled_while_unsealed_then_complete_on_seal() {
let mut buf = LBuffer::<i64>::with_capacity_masked(64);
for i in 0..10i64 {
if i == 3 || i == 9 {
buf.push_null().unwrap();
} else {
buf.push(i).unwrap();
}
}
let mask = buf.as_bitmask();
assert_eq!(mask.len(), 10);
assert_eq!(mask.as_slice(), &[0b1111_0111]); assert!(mask.get(8) && !mask.get(9)); assert_eq!(mask.count_zeros(), 2);
buf.seal();
let mask = buf.as_bitmask();
assert_eq!(mask.len(), 10);
assert_eq!(mask.as_slice(), &[0b1111_0111, 0b0000_0001]);
assert_eq!(mask.count_zeros(), 2);
}
#[test]
fn masked_byte_and_unchecked_accessors_route_while_unsealed() {
let mut buf = LBuffer::<i64>::with_capacity_masked(64);
for i in 0..10i64 {
if i == 3 || i == 9 {
buf.push_null().unwrap();
} else {
buf.push(i).unwrap();
}
}
let mask = buf.as_bitmask();
assert_eq!(mask.as_ref(), &[0b1111_0111]);
assert_eq!(&*mask, &[0b1111_0111]);
assert_eq!(mask.buffer(), &[0b1111_0111]);
for i in 0..10usize {
let want = i != 3 && i != 9;
assert_eq!(unsafe { mask.get_unchecked(i) }, want, "bit {i}");
}
assert_eq!(unsafe { mask.get_unchecked_byte(0) }, 0b1111_0111);
assert_eq!(mask.iter_cleared().collect::<Vec<_>>(), vec![3, 9]);
assert!(format!("{mask}").contains("10 bits"));
}
#[test]
fn kernel_validity_merge_over_unsealed_masks() {
use crate::kernels::bitmask::merge_bitmasks_to_new;
let mut a = LBuffer::<f64>::with_capacity_masked(64);
let mut b = LBuffer::<f64>::with_capacity_masked(64);
for i in 0..20i64 {
if i % 5 == 0 {
a.push_null().unwrap();
} else {
a.push(i as f64).unwrap();
}
if i % 7 == 0 {
b.push_null().unwrap();
} else {
b.push(i as f64 * 2.0).unwrap();
}
}
let a_mask = a.as_bitmask();
let b_mask = b.as_bitmask();
let merged = merge_bitmasks_to_new(Some(&a_mask), Some(&b_mask), 20).unwrap();
for i in 0..20usize {
let want = !(i % 5 == 0 || i % 7 == 0);
assert_eq!(merged.get(i), want, "row {i}");
}
let a_vals = a.as_buffer();
assert_eq!(a_vals.len(), 20);
assert_eq!(a_vals.as_slice()[6], 6.0);
}
}