use std::convert::TryInto;
use std::fmt;
use std::marker::PhantomData;
use std::ops::Range;
use std::os::raw::c_char;
use std::ptr;
use cxx::UniquePtr;
use rust_decimal::Decimal;
use errors::{OrcError, OrcResult};
use memorypool;
macro_rules! impl_debug {
($struct_name:ident, $function_name:path) => {
impl fmt::Debug for $struct_name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
concat!(stringify!($struct_name), " {{ {} }}"),
$function_name(&self.0)
)
}
}
};
($struct_name:ident<$lifetime:lifetime>, $function_name:path) => {
impl<$lifetime> fmt::Debug for $struct_name<$lifetime> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
concat!(stringify!($struct_name), " {{ {} }}"),
$function_name(&self.0)
)
}
}
};
}
macro_rules! impl_upcast {
($struct_name:ident<$lifetime:lifetime>, $function_name:path) => {
impl<$lifetime> From<&$struct_name<$lifetime>> for BorrowedColumnVectorBatch<$lifetime> {
fn from(vector_batch: &$struct_name<$lifetime>) -> Self {
BorrowedColumnVectorBatch($function_name(vector_batch.0))
}
}
impl<$lifetime> ColumnVectorBatch<$lifetime> for $struct_name<$lifetime> {
fn inner(&self) -> &'a ffi::ColumnVectorBatch {
let untyped_vector_batch: BorrowedColumnVectorBatch<$lifetime> = self.into();
untyped_vector_batch.0
}
}
};
}
#[cxx::bridge]
pub(crate) mod ffi {
#[namespace = "orcxx_rs"]
unsafe extern "C++" {
type Int64DataBuffer = crate::memorypool::ffi::Int64DataBuffer;
type Int128DataBuffer = crate::memorypool::ffi::Int128DataBuffer;
type DoubleDataBuffer = crate::memorypool::ffi::DoubleDataBuffer;
type StringDataBuffer = crate::memorypool::ffi::StringDataBuffer;
type CharDataBuffer = crate::memorypool::ffi::CharDataBuffer;
}
#[namespace = "orc"]
unsafe extern "C++" {
include!("cpp-utils.hh");
include!("orc/Vector.hh");
type ColumnVectorBatch;
type LongVectorBatch;
type DoubleVectorBatch;
type StringVectorBatch;
type TimestampVectorBatch;
type Decimal64VectorBatch;
type Decimal128VectorBatch;
type StructVectorBatch;
type ListVectorBatch;
type MapVectorBatch;
}
impl UniquePtr<ColumnVectorBatch> {}
#[namespace = "orcxx_rs"]
unsafe extern "C++" {
type ColumnVectorBatchPtr;
#[namespace = "orcxx_rs::utils"]
#[rust_name = "ColumnVectorBatchPtr_make_ptr"]
fn into(batch_ptr: &ColumnVectorBatchPtr) -> *const ColumnVectorBatch;
}
#[namespace = "orcxx_rs::accessors"]
unsafe extern "C++" {
fn get_numElements(vectorBatch: &ColumnVectorBatch) -> u64;
fn get_hasNulls(vectorBatch: &ColumnVectorBatch) -> bool;
fn get_notNull(vectorBatch: &ColumnVectorBatch) -> &CharDataBuffer;
#[rust_name = "LongVectorBatch_get_data"]
fn get_data(vectorBatch: &LongVectorBatch) -> &Int64DataBuffer;
#[rust_name = "DoubleVectorBatch_get_data"]
fn get_data(vectorBatch: &DoubleVectorBatch) -> &DoubleDataBuffer;
#[rust_name = "StringVectorBatch_get_data"]
fn get_data(vectorBatch: &StringVectorBatch) -> &StringDataBuffer;
#[rust_name = "StringVectorBatch_get_length"]
fn get_length(vectorBatch: &StringVectorBatch) -> &Int64DataBuffer;
#[rust_name = "StringVectorBatch_get_blob"]
fn get_blob(vectorBatch: &StringVectorBatch) -> &CharDataBuffer;
#[rust_name = "TimestampVectorBatch_get_data"]
fn get_data(vectorBatch: &TimestampVectorBatch) -> &Int64DataBuffer;
#[rust_name = "TimestampVectorBatch_get_nanoseconds"]
fn get_nanoseconds(vectorBatch: &TimestampVectorBatch) -> &Int64DataBuffer;
#[rust_name = "Decimal64VectorBatch_get_values"]
fn get_values(vectorBatch: &Decimal64VectorBatch) -> &Int64DataBuffer;
#[rust_name = "Decimal64VectorBatch_get_precision"]
fn get_precision(vectorBatch: &Decimal64VectorBatch) -> i32;
#[rust_name = "Decimal64VectorBatch_get_scale"]
fn get_scale(vectorBatch: &Decimal64VectorBatch) -> i32;
#[rust_name = "Decimal128VectorBatch_get_values"]
fn get_values(vectorBatch: &Decimal128VectorBatch) -> &Int128DataBuffer;
#[rust_name = "Decimal128VectorBatch_get_precision"]
fn get_precision(vectorBatch: &Decimal128VectorBatch) -> i32;
#[rust_name = "Decimal128VectorBatch_get_scale"]
fn get_scale(vectorBatch: &Decimal128VectorBatch) -> i32;
#[rust_name = "StructVectorBatch_get_fields"]
fn get_fields(vectorBatch: &StructVectorBatch) -> &CxxVector<ColumnVectorBatchPtr>;
#[rust_name = "ListVectorBatch_get_elements"]
fn get_elements(vectorBatch: &ListVectorBatch) -> &UniquePtr<ColumnVectorBatch>;
#[rust_name = "ListVectorBatch_get_offsets"]
fn get_offsets(vectorBatch: &ListVectorBatch) -> &Int64DataBuffer;
#[rust_name = "MapVectorBatch_get_keys"]
fn get_keys(vectorBatch: &MapVectorBatch) -> &UniquePtr<ColumnVectorBatch>;
#[rust_name = "MapVectorBatch_get_elements"]
fn get_elements(vectorBatch: &MapVectorBatch) -> &UniquePtr<ColumnVectorBatch>;
#[rust_name = "MapVectorBatch_get_offsets"]
fn get_offsets(vectorBatch: &MapVectorBatch) -> &Int64DataBuffer;
}
#[namespace = "orcxx_rs::utils"]
unsafe extern "C++" {
#[rust_name = "try_into_LongVectorBatch"]
fn try_into(vectorBatch: &ColumnVectorBatch) -> Result<&LongVectorBatch>;
#[rust_name = "try_into_DoubleVectorBatch"]
fn try_into(vectorBatch: &ColumnVectorBatch) -> Result<&DoubleVectorBatch>;
#[rust_name = "try_into_StringVectorBatch"]
fn try_into(vectorBatch: &ColumnVectorBatch) -> Result<&StringVectorBatch>;
#[rust_name = "try_into_TimestampVectorBatch"]
fn try_into(vectorBatch: &ColumnVectorBatch) -> Result<&TimestampVectorBatch>;
#[rust_name = "try_into_Decimal64VectorBatch"]
fn try_into(vectorBatch: &ColumnVectorBatch) -> Result<&Decimal64VectorBatch>;
#[rust_name = "try_into_Decimal128VectorBatch"]
fn try_into(vectorBatch: &ColumnVectorBatch) -> Result<&Decimal128VectorBatch>;
#[rust_name = "try_into_StructVectorBatch"]
fn try_into(vectorBatch: &ColumnVectorBatch) -> Result<&StructVectorBatch>;
#[rust_name = "try_into_ListVectorBatch"]
fn try_into(vectorBatch: &ColumnVectorBatch) -> Result<&ListVectorBatch>;
#[rust_name = "try_into_MapVectorBatch"]
fn try_into(vectorBatch: &ColumnVectorBatch) -> Result<&MapVectorBatch>;
#[rust_name = "LongVectorBatch_into_ColumnVectorBatch"]
fn try_into(vectorBatch: &LongVectorBatch) -> &ColumnVectorBatch;
#[rust_name = "DoubleVectorBatch_into_ColumnVectorBatch"]
fn try_into(vectorBatch: &DoubleVectorBatch) -> &ColumnVectorBatch;
#[rust_name = "StringVectorBatch_into_ColumnVectorBatch"]
fn try_into(vectorBatch: &StringVectorBatch) -> &ColumnVectorBatch;
#[rust_name = "TimestampVectorBatch_into_ColumnVectorBatch"]
fn try_into(vectorBatch: &TimestampVectorBatch) -> &ColumnVectorBatch;
#[rust_name = "Decimal64VectorBatch_into_ColumnVectorBatch"]
fn try_into(vectorBatch: &Decimal64VectorBatch) -> &ColumnVectorBatch;
#[rust_name = "Decimal128VectorBatch_into_ColumnVectorBatch"]
fn try_into(vectorBatch: &Decimal128VectorBatch) -> &ColumnVectorBatch;
#[rust_name = "StructVectorBatch_into_ColumnVectorBatch"]
fn try_into(vectorBatch: &StructVectorBatch) -> &ColumnVectorBatch;
#[rust_name = "ListVectorBatch_into_ColumnVectorBatch"]
fn try_into(vectorBatch: &ListVectorBatch) -> &ColumnVectorBatch;
#[rust_name = "MapVectorBatch_into_ColumnVectorBatch"]
fn try_into(vectorBatch: &MapVectorBatch) -> &ColumnVectorBatch;
#[rust_name = "ColumnVectorBatch_toString"]
fn toString(type_: &ColumnVectorBatch) -> UniquePtr<CxxString>;
#[rust_name = "LongVectorBatch_toString"]
fn toString(type_: &LongVectorBatch) -> UniquePtr<CxxString>;
#[rust_name = "DoubleVectorBatch_toString"]
fn toString(type_: &DoubleVectorBatch) -> UniquePtr<CxxString>;
#[rust_name = "StringVectorBatch_toString"]
fn toString(type_: &StringVectorBatch) -> UniquePtr<CxxString>;
#[rust_name = "TimestampVectorBatch_toString"]
fn toString(type_: &TimestampVectorBatch) -> UniquePtr<CxxString>;
#[rust_name = "Decimal64VectorBatch_toString"]
fn toString(type_: &Decimal64VectorBatch) -> UniquePtr<CxxString>;
#[rust_name = "Decimal128VectorBatch_toString"]
fn toString(type_: &Decimal128VectorBatch) -> UniquePtr<CxxString>;
#[rust_name = "StructVectorBatch_toString"]
fn toString(type_: &StructVectorBatch) -> UniquePtr<CxxString>;
#[rust_name = "ListVectorBatch_toString"]
fn toString(type_: &ListVectorBatch) -> UniquePtr<CxxString>;
#[rust_name = "MapVectorBatch_toString"]
fn toString(type_: &MapVectorBatch) -> UniquePtr<CxxString>;
}
}
pub trait ColumnVectorBatch<'a> {
fn inner(&self) -> &'a ffi::ColumnVectorBatch;
fn num_elements(&self) -> u64 {
ffi::get_numElements(self.inner())
}
fn not_null(&self) -> Option<&'a [i8]> {
self.not_null_ptr().map(|not_null| {
let num_elements = self
.num_elements()
.try_into()
.expect("could not convert u64 to usize");
unsafe { std::slice::from_raw_parts(not_null.as_ptr(), num_elements) }
})
}
fn not_null_ptr(&self) -> Option<ptr::NonNull<i8>> {
if ffi::get_hasNulls(self.inner()) {
let not_null = ffi::get_notNull(self.inner()).data();
assert_ne!(not_null, ptr::null());
Some(unsafe { ptr::NonNull::new_unchecked(not_null as *mut i8) })
} else {
None
}
}
}
pub struct OwnedColumnVectorBatch(pub(crate) UniquePtr<ffi::ColumnVectorBatch>);
impl_debug!(OwnedColumnVectorBatch, ffi::ColumnVectorBatch_toString);
impl<'a> ColumnVectorBatch<'a> for &'a OwnedColumnVectorBatch {
fn inner(&self) -> &'a ffi::ColumnVectorBatch {
&self.0
}
}
impl OwnedColumnVectorBatch {
pub fn borrow(&self) -> BorrowedColumnVectorBatch {
BorrowedColumnVectorBatch(&self.0)
}
}
unsafe impl Send for OwnedColumnVectorBatch {}
pub struct BorrowedColumnVectorBatch<'a>(&'a ffi::ColumnVectorBatch);
impl_debug!(
BorrowedColumnVectorBatch<'a>,
ffi::ColumnVectorBatch_toString
);
impl<'a> ColumnVectorBatch<'a> for BorrowedColumnVectorBatch<'a> {
fn inner(&self) -> &'a ffi::ColumnVectorBatch {
self.0
}
}
impl<'a> BorrowedColumnVectorBatch<'a> {
pub fn try_into_longs(&self) -> OrcResult<LongVectorBatch<'a>> {
ffi::try_into_LongVectorBatch(self.0)
.map_err(OrcError)
.map(LongVectorBatch)
}
pub fn try_into_doubles(&self) -> OrcResult<DoubleVectorBatch<'a>> {
ffi::try_into_DoubleVectorBatch(self.0)
.map_err(OrcError)
.map(DoubleVectorBatch)
}
pub fn try_into_strings(&self) -> OrcResult<StringVectorBatch<'a>> {
ffi::try_into_StringVectorBatch(self.0)
.map_err(OrcError)
.map(StringVectorBatch)
}
pub fn try_into_timestamps(&self) -> OrcResult<TimestampVectorBatch<'a>> {
ffi::try_into_TimestampVectorBatch(self.0)
.map_err(OrcError)
.map(TimestampVectorBatch)
}
pub fn try_into_decimals64(&self) -> OrcResult<Decimal64VectorBatch<'a>> {
ffi::try_into_Decimal64VectorBatch(self.0)
.map_err(OrcError)
.map(Decimal64VectorBatch)
}
pub fn try_into_decimals128(&self) -> OrcResult<Decimal128VectorBatch<'a>> {
ffi::try_into_Decimal128VectorBatch(self.0)
.map_err(OrcError)
.map(Decimal128VectorBatch)
}
pub fn try_into_structs(&self) -> OrcResult<StructVectorBatch<'a>> {
ffi::try_into_StructVectorBatch(self.0)
.map_err(OrcError)
.map(StructVectorBatch)
}
pub fn try_into_lists(&self) -> OrcResult<ListVectorBatch<'a>> {
ffi::try_into_ListVectorBatch(self.0)
.map_err(OrcError)
.map(ListVectorBatch)
}
pub fn try_into_maps(&self) -> OrcResult<MapVectorBatch<'a>> {
ffi::try_into_MapVectorBatch(self.0)
.map_err(OrcError)
.map(MapVectorBatch)
}
}
unsafe impl<'a> Send for BorrowedColumnVectorBatch<'a> {}
pub struct StructVectorBatch<'a>(&'a ffi::StructVectorBatch);
impl_debug!(StructVectorBatch<'a>, ffi::StructVectorBatch_toString);
impl_upcast!(
StructVectorBatch<'a>,
ffi::StructVectorBatch_into_ColumnVectorBatch
);
impl<'a> StructVectorBatch<'a> {
pub fn fields(&self) -> Vec<BorrowedColumnVectorBatch<'a>> {
ffi::StructVectorBatch_get_fields(self.0)
.iter()
.map(|batch_ptr| {
BorrowedColumnVectorBatch(unsafe {
&*ffi::ColumnVectorBatchPtr_make_ptr(batch_ptr)
})
})
.collect()
}
}
unsafe impl<'a> Send for StructVectorBatch<'a> {}
pub struct LongVectorBatch<'a>(&'a ffi::LongVectorBatch);
impl_debug!(LongVectorBatch<'a>, ffi::LongVectorBatch_toString);
impl_upcast!(
LongVectorBatch<'a>,
ffi::LongVectorBatch_into_ColumnVectorBatch
);
impl<'a> LongVectorBatch<'a> {
pub fn iter(&self) -> LongVectorBatchIterator {
let data = ffi::LongVectorBatch_get_data(self.0);
let num_elements = self.num_elements();
let not_null = self.not_null_ptr();
unsafe { LongVectorBatchIterator::new(data, not_null, num_elements) }
}
pub fn try_iter_not_null(&self) -> Option<NotNullLongVectorBatchIterator> {
let data = ffi::LongVectorBatch_get_data(self.0);
let num_elements = self.num_elements();
if self.not_null_ptr().is_some() {
None
} else {
Some(unsafe { NotNullLongVectorBatchIterator::new(data, num_elements) })
}
}
}
unsafe impl<'a> Send for LongVectorBatch<'a> {}
#[derive(Debug, Clone)]
pub struct LongVectorBatchIterator<'a> {
batch: PhantomData<&'a LongVectorBatch<'a>>,
data_index: isize,
not_null_index: isize,
data: *const i64,
not_null: Option<ptr::NonNull<i8>>,
num_elements: isize,
}
impl<'a> LongVectorBatchIterator<'a> {
unsafe fn new(
data_buffer: &memorypool::ffi::Int64DataBuffer,
not_null: Option<ptr::NonNull<i8>>,
num_elements: u64,
) -> LongVectorBatchIterator<'a> {
LongVectorBatchIterator {
batch: PhantomData,
data_index: 0,
not_null_index: 0,
data: data_buffer.data(),
not_null,
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
}
}
}
impl<'a> Iterator for LongVectorBatchIterator<'a> {
type Item = Option<i64>;
fn next(&mut self) -> Option<Option<i64>> {
if self.not_null_index >= self.num_elements {
return None;
}
if let Some(not_null) = self.not_null {
let not_null = not_null.as_ptr();
if unsafe { *not_null.offset(self.not_null_index) } == 0 {
self.not_null_index += 1;
return Some(None);
}
}
self.not_null_index += 1;
let datum = unsafe { *self.data.offset(self.data_index) };
self.data_index += 1;
Some(Some(datum))
}
}
#[derive(Debug, Clone)]
pub struct NotNullLongVectorBatchIterator<'a> {
batch: PhantomData<&'a LongVectorBatch<'a>>,
index: isize,
data: *const i64,
num_elements: isize,
}
impl<'a> NotNullLongVectorBatchIterator<'a> {
unsafe fn new(
data_buffer: &memorypool::ffi::Int64DataBuffer,
num_elements: u64,
) -> NotNullLongVectorBatchIterator<'a> {
NotNullLongVectorBatchIterator {
batch: PhantomData,
index: 0,
data: data_buffer.data(),
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
}
}
}
impl<'a> Iterator for NotNullLongVectorBatchIterator<'a> {
type Item = i64;
fn next(&mut self) -> Option<i64> {
if self.index >= self.num_elements {
return None;
}
let datum = unsafe { *self.data.offset(self.index) };
self.index += 1;
Some(datum)
}
}
pub struct DoubleVectorBatch<'a>(&'a ffi::DoubleVectorBatch);
impl_debug!(DoubleVectorBatch<'a>, ffi::DoubleVectorBatch_toString);
impl_upcast!(
DoubleVectorBatch<'a>,
ffi::DoubleVectorBatch_into_ColumnVectorBatch
);
impl<'a> DoubleVectorBatch<'a> {
pub fn iter(&self) -> DoubleVectorBatchIterator {
let data = ffi::DoubleVectorBatch_get_data(self.0).data();
let vector_batch =
BorrowedColumnVectorBatch(ffi::DoubleVectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
let not_null = vector_batch.not_null_ptr();
DoubleVectorBatchIterator {
batch: PhantomData,
data_index: 0,
not_null_index: 0,
data,
not_null,
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
}
}
pub fn try_iter_not_null(&self) -> Option<NotNullDoubleVectorBatchIterator> {
let data = ffi::DoubleVectorBatch_get_data(self.0).data();
let vector_batch =
BorrowedColumnVectorBatch(ffi::DoubleVectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
if vector_batch.not_null_ptr().is_some() {
None
} else {
Some(NotNullDoubleVectorBatchIterator {
batch: PhantomData,
index: 0,
data,
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
})
}
}
}
unsafe impl<'a> Send for DoubleVectorBatch<'a> {}
#[derive(Debug, Clone)]
pub struct DoubleVectorBatchIterator<'a> {
batch: PhantomData<&'a DoubleVectorBatch<'a>>,
data_index: isize,
not_null_index: isize,
data: *const f64,
not_null: Option<ptr::NonNull<i8>>,
num_elements: isize,
}
impl<'a> Iterator for DoubleVectorBatchIterator<'a> {
type Item = Option<f64>;
fn next(&mut self) -> Option<Option<f64>> {
if self.not_null_index >= self.num_elements {
return None;
}
if let Some(not_null) = self.not_null {
let not_null = not_null.as_ptr();
if unsafe { *not_null.offset(self.not_null_index) } == 0 {
self.not_null_index += 1;
return Some(None);
}
}
self.not_null_index += 1;
let datum = unsafe { *self.data.offset(self.data_index) };
self.data_index += 1;
Some(Some(datum))
}
}
#[derive(Debug, Clone)]
pub struct NotNullDoubleVectorBatchIterator<'a> {
batch: PhantomData<&'a DoubleVectorBatch<'a>>,
index: isize,
data: *const f64,
num_elements: isize,
}
impl<'a> Iterator for NotNullDoubleVectorBatchIterator<'a> {
type Item = f64;
fn next(&mut self) -> Option<f64> {
if self.index >= self.num_elements {
return None;
}
let datum = unsafe { *self.data.offset(self.index) };
self.index += 1;
Some(datum)
}
}
pub struct StringVectorBatch<'a>(&'a ffi::StringVectorBatch);
impl_debug!(StringVectorBatch<'a>, ffi::StringVectorBatch_toString);
impl_upcast!(
StringVectorBatch<'a>,
ffi::StringVectorBatch_into_ColumnVectorBatch
);
impl<'a> StringVectorBatch<'a> {
pub fn iter(&self) -> StringVectorBatchIterator {
let data = ffi::StringVectorBatch_get_data(self.0).data();
let lengths = ffi::StringVectorBatch_get_length(self.0).data();
let vector_batch =
BorrowedColumnVectorBatch(ffi::StringVectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
let not_null = vector_batch.not_null_ptr();
StringVectorBatchIterator {
batch: PhantomData,
index: 0,
data,
not_null,
lengths,
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
}
}
pub fn try_iter_not_null(&self) -> Option<NotNullStringVectorBatchIterator> {
let data = ffi::StringVectorBatch_get_data(self.0).data();
let lengths = ffi::StringVectorBatch_get_length(self.0).data();
let vector_batch =
BorrowedColumnVectorBatch(ffi::StringVectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
if vector_batch.not_null_ptr().is_some() {
None
} else {
Some(NotNullStringVectorBatchIterator {
batch: PhantomData,
index: 0,
data,
lengths,
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
})
}
}
pub fn bytes(&self) -> &[u8] {
let data_buffer = ffi::StringVectorBatch_get_blob(self.0);
unsafe {
std::slice::from_raw_parts(
data_buffer.data() as *const u8, data_buffer
.size()
.try_into()
.expect("could not convert u64 to usize"),
)
}
}
pub fn ranges(&self) -> Vec<Option<Range<usize>>> {
let mut ranges = Vec::with_capacity(
self.num_elements()
.try_into()
.expect("could not convert u64 to usize"),
);
let vector_batch =
BorrowedColumnVectorBatch(ffi::StringVectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
let lengths = ffi::StringVectorBatch_get_length(self.0).data();
match vector_batch.not_null_ptr() {
None => {
let mut current_index = 0usize;
for i in 0..num_elements {
let i: isize = i.try_into().expect("could not convert u64 to isize");
let length: usize = unsafe { *lengths.offset(i) }
.try_into()
.expect("could not convert u64 to usize");
let new_index = current_index + length;
ranges.push(Some(current_index..new_index));
current_index = new_index;
}
}
Some(not_null) => {
let not_null = not_null.as_ptr();
let mut current_index = 0usize;
for i in 0..num_elements {
let i: isize = i.try_into().expect("could not convert u64 to isize");
if unsafe { *not_null.offset(i) } == 0 {
ranges.push(None);
} else {
let length: usize = unsafe { *lengths.offset(i) }
.try_into()
.expect("could not convert u64 to usize");
let new_index = current_index + length;
ranges.push(Some(current_index..new_index));
current_index = new_index;
}
}
}
}
ranges
}
}
unsafe impl<'a> Send for StringVectorBatch<'a> {}
#[derive(Debug, Clone)]
pub struct StringVectorBatchIterator<'a> {
batch: PhantomData<&'a StringVectorBatch<'a>>,
index: isize,
data: *const *mut c_char, lengths: *const i64, not_null: Option<ptr::NonNull<i8>>,
num_elements: isize,
}
impl<'a> Iterator for StringVectorBatchIterator<'a> {
type Item = Option<&'a [u8]>;
fn next(&mut self) -> Option<Option<&'a [u8]>> {
if self.index >= self.num_elements {
return None;
}
if let Some(not_null) = self.not_null {
let not_null = not_null.as_ptr();
if unsafe { *not_null.offset(self.index) } == 0 {
self.index += 1;
return Some(None);
}
}
let datum = unsafe { *self.data.offset(self.index) };
let length = unsafe { *self.lengths.offset(self.index) };
self.index += 1;
let length = length.try_into().expect("could not convert u64 to usize");
let datum = datum as *const u8;
Some(Some(unsafe { std::slice::from_raw_parts(datum, length) }))
}
}
#[derive(Debug, Clone)]
pub struct NotNullStringVectorBatchIterator<'a> {
batch: PhantomData<&'a StringVectorBatch<'a>>,
index: isize,
data: *const *mut c_char, lengths: *const i64, num_elements: isize,
}
impl<'a> Iterator for NotNullStringVectorBatchIterator<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<&'a [u8]> {
if self.index >= self.num_elements {
return None;
}
let datum = unsafe { *self.data.offset(self.index) };
let length = unsafe { *self.lengths.offset(self.index) };
self.index += 1;
let length = length.try_into().expect("could not convert u64 to usize");
let datum = datum as *const u8;
Some(unsafe { std::slice::from_raw_parts(datum, length) })
}
}
pub struct TimestampVectorBatch<'a>(&'a ffi::TimestampVectorBatch);
impl_debug!(TimestampVectorBatch<'a>, ffi::TimestampVectorBatch_toString);
impl_upcast!(
TimestampVectorBatch<'a>,
ffi::TimestampVectorBatch_into_ColumnVectorBatch
);
impl<'a> TimestampVectorBatch<'a> {
pub fn iter(&self) -> TimestampVectorBatchIterator {
let data = ffi::TimestampVectorBatch_get_data(self.0).data();
let nanoseconds = ffi::TimestampVectorBatch_get_nanoseconds(self.0).data();
let vector_batch =
BorrowedColumnVectorBatch(ffi::TimestampVectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
let not_null = vector_batch.not_null_ptr();
TimestampVectorBatchIterator {
batch: PhantomData,
index: 0,
data,
not_null,
nanoseconds,
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
}
}
pub fn try_iter_not_null(&self) -> Option<NotNullTimestampVectorBatchIterator> {
let data = ffi::TimestampVectorBatch_get_data(self.0).data();
let nanoseconds = ffi::TimestampVectorBatch_get_nanoseconds(self.0).data();
let vector_batch =
BorrowedColumnVectorBatch(ffi::TimestampVectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
if vector_batch.not_null_ptr().is_some() {
None
} else {
Some(NotNullTimestampVectorBatchIterator {
batch: PhantomData,
index: 0,
data,
nanoseconds,
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
})
}
}
}
unsafe impl<'a> Send for TimestampVectorBatch<'a> {}
#[derive(Debug, Clone)]
pub struct TimestampVectorBatchIterator<'a> {
batch: PhantomData<&'a TimestampVectorBatch<'a>>,
index: isize,
data: *const i64, nanoseconds: *const i64,
not_null: Option<ptr::NonNull<i8>>,
num_elements: isize,
}
impl<'a> Iterator for TimestampVectorBatchIterator<'a> {
type Item = Option<(i64, i64)>;
fn next(&mut self) -> Option<Option<(i64, i64)>> {
if self.index >= self.num_elements {
return None;
}
if let Some(not_null) = self.not_null {
let not_null = not_null.as_ptr();
if unsafe { *not_null.offset(self.index) } == 0 {
self.index += 1;
return Some(None);
}
}
let datum = unsafe { *self.data.offset(self.index) };
let nanoseconds = unsafe { *self.nanoseconds.offset(self.index) };
self.index += 1;
Some(Some((datum, nanoseconds)))
}
}
#[derive(Debug, Clone)]
pub struct NotNullTimestampVectorBatchIterator<'a> {
batch: PhantomData<&'a TimestampVectorBatch<'a>>,
index: isize,
data: *const i64, nanoseconds: *const i64,
num_elements: isize,
}
impl<'a> Iterator for NotNullTimestampVectorBatchIterator<'a> {
type Item = (i64, i64);
fn next(&mut self) -> Option<(i64, i64)> {
if self.index >= self.num_elements {
return None;
}
let datum = unsafe { *self.data.offset(self.index) };
let nanoseconds = unsafe { *self.nanoseconds.offset(self.index) };
self.index += 1;
Some((datum, nanoseconds))
}
}
pub trait DecimalVectorBatch<'a> {
type IteratorType: Iterator<Item = Option<Decimal>>;
type NotNullIteratorType: Iterator<Item = Decimal>;
fn precision(&self) -> i32;
fn scale(&self) -> i32;
fn iter(&self) -> Self::IteratorType;
fn try_iter_not_null(&self) -> Option<Self::NotNullIteratorType>;
}
pub struct Decimal64VectorBatch<'a>(&'a ffi::Decimal64VectorBatch);
impl_debug!(Decimal64VectorBatch<'a>, ffi::Decimal64VectorBatch_toString);
impl_upcast!(
Decimal64VectorBatch<'a>,
ffi::Decimal64VectorBatch_into_ColumnVectorBatch
);
impl<'a> DecimalVectorBatch<'a> for Decimal64VectorBatch<'a> {
type IteratorType = Decimal64VectorBatchIterator<'a>;
type NotNullIteratorType = NotNullDecimal64VectorBatchIterator<'a>;
fn precision(&self) -> i32 {
ffi::Decimal64VectorBatch_get_precision(self.0)
}
fn scale(&self) -> i32 {
ffi::Decimal64VectorBatch_get_scale(self.0)
}
fn iter(&self) -> Decimal64VectorBatchIterator<'a> {
let data = ffi::Decimal64VectorBatch_get_values(self.0).data();
let vector_batch =
BorrowedColumnVectorBatch(ffi::Decimal64VectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
let not_null = vector_batch.not_null_ptr();
Decimal64VectorBatchIterator {
batch: PhantomData,
data_index: 0,
not_null_index: 0,
data,
not_null,
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
scale: self
.scale()
.try_into()
.expect("Could not convert scale from i32 to u43"),
}
}
fn try_iter_not_null(&self) -> Option<NotNullDecimal64VectorBatchIterator<'a>> {
let data = ffi::Decimal64VectorBatch_get_values(self.0).data();
let vector_batch =
BorrowedColumnVectorBatch(ffi::Decimal64VectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
if vector_batch.not_null_ptr().is_some() {
None
} else {
Some(NotNullDecimal64VectorBatchIterator {
batch: PhantomData,
index: 0,
data,
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
scale: self
.scale()
.try_into()
.expect("Could not convert scale from i32 to u43"),
})
}
}
}
unsafe impl<'a> Send for Decimal64VectorBatch<'a> {}
#[derive(Debug, Clone)]
pub struct Decimal64VectorBatchIterator<'a> {
batch: PhantomData<&'a Decimal64VectorBatch<'a>>,
data_index: isize,
not_null_index: isize,
data: *const i64,
not_null: Option<ptr::NonNull<i8>>,
num_elements: isize,
scale: u32,
}
impl<'a> Iterator for Decimal64VectorBatchIterator<'a> {
type Item = Option<Decimal>;
fn next(&mut self) -> Option<Option<Decimal>> {
if self.not_null_index >= self.num_elements {
return None;
}
if let Some(not_null) = self.not_null {
let not_null = not_null.as_ptr();
if unsafe { *not_null.offset(self.not_null_index) } == 0 {
self.not_null_index += 1;
return Some(None);
}
}
self.not_null_index += 1;
let datum = unsafe { *self.data.offset(self.data_index) };
self.data_index += 1;
Some(Some(Decimal::new(datum, self.scale)))
}
}
#[derive(Debug, Clone)]
pub struct NotNullDecimal64VectorBatchIterator<'a> {
batch: PhantomData<&'a Decimal64VectorBatch<'a>>,
index: isize,
data: *const i64,
num_elements: isize,
scale: u32,
}
impl<'a> Iterator for NotNullDecimal64VectorBatchIterator<'a> {
type Item = Decimal;
fn next(&mut self) -> Option<Decimal> {
if self.index >= self.num_elements {
return None;
}
let datum = unsafe { *self.data.offset(self.index) };
self.index += 1;
Some(Decimal::new(datum, self.scale))
}
}
pub struct Decimal128VectorBatch<'a>(&'a ffi::Decimal128VectorBatch);
impl_debug!(
Decimal128VectorBatch<'a>,
ffi::Decimal128VectorBatch_toString
);
impl_upcast!(
Decimal128VectorBatch<'a>,
ffi::Decimal128VectorBatch_into_ColumnVectorBatch
);
impl<'a> DecimalVectorBatch<'a> for Decimal128VectorBatch<'a> {
type IteratorType = Decimal128VectorBatchIterator<'a>;
type NotNullIteratorType = NotNullDecimal128VectorBatchIterator<'a>;
fn precision(&self) -> i32 {
ffi::Decimal128VectorBatch_get_precision(self.0)
}
fn scale(&self) -> i32 {
ffi::Decimal128VectorBatch_get_scale(self.0)
}
fn iter(&self) -> Decimal128VectorBatchIterator<'a> {
let data = ffi::Decimal128VectorBatch_get_values(self.0).data();
let vector_batch =
BorrowedColumnVectorBatch(ffi::Decimal128VectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
let not_null = vector_batch.not_null_ptr();
Decimal128VectorBatchIterator {
batch: PhantomData,
data_index: 0,
not_null_index: 0,
data,
not_null,
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
scale: self
.scale()
.try_into()
.expect("Could not convert scale from i32 to u43"),
}
}
fn try_iter_not_null(&self) -> Option<NotNullDecimal128VectorBatchIterator<'a>> {
let data = ffi::Decimal128VectorBatch_get_values(self.0).data();
let vector_batch =
BorrowedColumnVectorBatch(ffi::Decimal128VectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
if vector_batch.not_null_ptr().is_some() {
None
} else {
Some(NotNullDecimal128VectorBatchIterator {
batch: PhantomData,
index: 0,
data,
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
scale: self
.scale()
.try_into()
.expect("Could not convert scale from i32 to u43"),
})
}
}
}
unsafe impl<'a> Send for Decimal128VectorBatch<'a> {}
#[derive(Debug, Clone)]
pub struct Decimal128VectorBatchIterator<'a> {
batch: PhantomData<&'a Decimal128VectorBatch<'a>>,
data_index: isize,
not_null_index: isize,
data: *const memorypool::ffi::Int128,
not_null: Option<ptr::NonNull<i8>>,
num_elements: isize,
scale: u32,
}
impl<'a> Iterator for Decimal128VectorBatchIterator<'a> {
type Item = Option<Decimal>;
fn next(&mut self) -> Option<Option<Decimal>> {
if self.not_null_index >= self.num_elements {
return None;
}
if let Some(not_null) = self.not_null {
let not_null = not_null.as_ptr();
if unsafe { *not_null.offset(self.not_null_index) } == 0 {
self.not_null_index += 1;
return Some(None);
}
}
self.not_null_index += 1;
let datum = unsafe {
&*((self.data as *const i128).offset(self.data_index) as *const memorypool::ffi::Int128)
};
self.data_index += 1;
let datum = (datum.getHighBits() as i128) << 64 | (datum.getLowBits() as i128);
Some(Some(Decimal::from_i128_with_scale(datum, self.scale)))
}
}
#[derive(Debug, Clone)]
pub struct NotNullDecimal128VectorBatchIterator<'a> {
batch: PhantomData<&'a Decimal128VectorBatch<'a>>,
index: isize,
data: *const memorypool::ffi::Int128,
num_elements: isize,
scale: u32,
}
impl<'a> Iterator for NotNullDecimal128VectorBatchIterator<'a> {
type Item = Decimal;
fn next(&mut self) -> Option<Decimal> {
if self.index >= self.num_elements {
return None;
}
let datum = unsafe {
&*((self.data as *const i128).offset(self.index) as *const memorypool::ffi::Int128)
};
self.index += 1;
let datum = (datum.getHighBits() as i128) << 64 | (datum.getLowBits() as i128);
Some(Decimal::from_i128_with_scale(datum, self.scale))
}
}
pub struct ListVectorBatch<'a>(&'a ffi::ListVectorBatch);
impl_debug!(ListVectorBatch<'a>, ffi::ListVectorBatch_toString);
impl_upcast!(
ListVectorBatch<'a>,
ffi::ListVectorBatch_into_ColumnVectorBatch
);
impl<'a> ListVectorBatch<'a> {
pub fn elements(&self) -> BorrowedColumnVectorBatch<'a> {
BorrowedColumnVectorBatch(ffi::ListVectorBatch_get_elements(self.0))
}
pub fn iter_offsets(&self) -> RangeVectorBatchIterator<'a> {
let offsets = ffi::ListVectorBatch_get_offsets(self.0);
let vector_batch =
BorrowedColumnVectorBatch(ffi::ListVectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
let not_null = vector_batch.not_null_ptr();
unsafe { RangeVectorBatchIterator::new(offsets, not_null, num_elements) }
}
pub fn try_iter_offsets_not_null(&self) -> Option<NotNullRangeVectorBatchIterator<'a>> {
let offsets = ffi::ListVectorBatch_get_offsets(self.0);
let vector_batch =
BorrowedColumnVectorBatch(ffi::ListVectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
if vector_batch.not_null_ptr().is_some() {
None
} else {
Some(unsafe { NotNullRangeVectorBatchIterator::new(offsets, num_elements) })
}
}
}
unsafe impl<'a> Send for ListVectorBatch<'a> {}
pub struct MapVectorBatch<'a>(&'a ffi::MapVectorBatch);
impl_debug!(MapVectorBatch<'a>, ffi::MapVectorBatch_toString);
impl_upcast!(
MapVectorBatch<'a>,
ffi::MapVectorBatch_into_ColumnVectorBatch
);
impl<'a> MapVectorBatch<'a> {
pub fn keys(&self) -> BorrowedColumnVectorBatch<'a> {
BorrowedColumnVectorBatch(ffi::MapVectorBatch_get_keys(self.0))
}
pub fn elements(&self) -> BorrowedColumnVectorBatch<'a> {
BorrowedColumnVectorBatch(ffi::MapVectorBatch_get_elements(self.0))
}
pub fn iter_offsets(&self) -> RangeVectorBatchIterator<'a> {
let offsets = ffi::MapVectorBatch_get_offsets(self.0);
let vector_batch =
BorrowedColumnVectorBatch(ffi::MapVectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
let not_null = vector_batch.not_null_ptr();
unsafe { RangeVectorBatchIterator::new(offsets, not_null, num_elements) }
}
pub fn try_iter_offsets_not_null(&self) -> Option<NotNullRangeVectorBatchIterator<'a>> {
let offsets = ffi::MapVectorBatch_get_offsets(self.0);
let vector_batch =
BorrowedColumnVectorBatch(ffi::MapVectorBatch_into_ColumnVectorBatch(self.0));
let num_elements = vector_batch.num_elements();
if vector_batch.not_null_ptr().is_some() {
None
} else {
Some(unsafe { NotNullRangeVectorBatchIterator::new(offsets, num_elements) })
}
}
}
unsafe impl<'a> Send for MapVectorBatch<'a> {}
#[derive(Debug, Clone)]
pub struct RangeVectorBatchIterator<'a> {
batch: PhantomData<&'a LongVectorBatch<'a>>,
data_index: isize,
not_null_index: isize,
data: *const i64,
not_null: Option<ptr::NonNull<i8>>,
num_elements: isize,
}
impl<'a> RangeVectorBatchIterator<'a> {
unsafe fn new(
data_buffer: &memorypool::ffi::Int64DataBuffer,
not_null: Option<ptr::NonNull<i8>>,
num_elements: u64,
) -> RangeVectorBatchIterator<'a> {
RangeVectorBatchIterator {
batch: PhantomData,
data_index: 0,
not_null_index: 0,
data: data_buffer.data(),
not_null,
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
}
}
}
impl<'a> Iterator for RangeVectorBatchIterator<'a> {
type Item = Option<Range<usize>>;
fn next(&mut self) -> Option<Option<Range<usize>>> {
if self.not_null_index >= self.num_elements {
return None;
}
if let Some(not_null) = self.not_null {
let not_null = not_null.as_ptr();
if unsafe { *not_null.offset(self.not_null_index) } == 0 {
self.not_null_index += 1;
return Some(None);
}
}
let next_datum = unsafe { *self.data.offset(self.data_index + 1) }
.try_into()
.expect("could not convert i64 to usize");
let datum = unsafe { *self.data.offset(self.data_index) } as usize;
self.not_null_index += 1;
self.data_index += 1;
Some(Some(datum..next_datum))
}
}
#[derive(Debug, Clone)]
pub struct NotNullRangeVectorBatchIterator<'a> {
batch: PhantomData<&'a LongVectorBatch<'a>>,
index: isize,
data: *const i64,
num_elements: isize,
}
impl<'a> NotNullRangeVectorBatchIterator<'a> {
unsafe fn new(
data_buffer: &memorypool::ffi::Int64DataBuffer,
num_elements: u64,
) -> NotNullRangeVectorBatchIterator<'a> {
NotNullRangeVectorBatchIterator {
batch: PhantomData,
index: 0,
data: data_buffer.data(),
num_elements: num_elements
.try_into()
.expect("could not convert u64 to isize"),
}
}
}
impl<'a> Iterator for NotNullRangeVectorBatchIterator<'a> {
type Item = Range<usize>;
fn next(&mut self) -> Option<Range<usize>> {
if self.index >= self.num_elements {
return None;
}
let next_datum = unsafe { *self.data.offset(self.index + 1) }
.try_into()
.expect("could not convert i64 to usize");
let datum = unsafe { *self.data.offset(self.index) } as usize;
self.index += 1;
Some(datum..next_datum)
}
}