use std::{
fmt::{self, Debug, Formatter},
marker::PhantomData,
mem::transmute,
sync::Arc,
};
use arrow::{array::RecordBatch, datatypes::Schema};
use parquet::arrow::ProjectionMask;
use crate::{
record::{option::OptionRecordRef, Key, Record, RecordRef, Schema as RecordSchema},
timestamp::Timestamped,
};
pub struct RecordBatchEntry<R>
where
R: Record,
{
_record_batch: RecordBatch,
record_ref: OptionRecordRef<'static, R::Ref<'static>>,
}
impl<R> RecordBatchEntry<R>
where
R: Record,
{
pub(crate) fn new(
_record_batch: RecordBatch,
record_ref: OptionRecordRef<'static, R::Ref<'static>>,
) -> Self {
Self {
_record_batch,
record_ref,
}
}
pub(crate) fn internal_key(
&self,
) -> Timestamped<<<R::Schema as RecordSchema>::Key as Key>::Ref<'_>> {
self.record_ref.key()
}
pub fn key(&self) -> <<R::Schema as RecordSchema>::Key as Key>::Ref<'_> {
self.internal_key().value().clone()
}
pub fn get(&self) -> Option<R::Ref<'_>> {
unsafe { transmute(self.record_ref.get()) }
}
}
impl<R> Debug for RecordBatchEntry<R>
where
R: Record + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("RecordBatchEntry").finish()
}
}
#[derive(Debug)]
pub struct RecordBatchIterator<R> {
record_batch: RecordBatch,
offset: usize,
projection_mask: ProjectionMask,
full_schema: Arc<Schema>,
_marker: PhantomData<R>,
}
impl<R> RecordBatchIterator<R>
where
R: Record,
{
pub(crate) fn new(
record_batch: RecordBatch,
projection_mask: ProjectionMask,
full_schema: Arc<Schema>,
) -> Self {
Self {
record_batch,
offset: 0,
projection_mask,
full_schema,
_marker: PhantomData,
}
}
}
impl<R> Iterator for RecordBatchIterator<R>
where
R: Record,
{
type Item = RecordBatchEntry<R>;
fn next(&mut self) -> Option<Self::Item> {
if self.offset >= self.record_batch.num_rows() {
return None;
}
let record_batch = self.record_batch.clone();
let record = R::Ref::from_record_batch(
&self.record_batch,
self.offset,
&self.projection_mask,
&self.full_schema,
);
let entry = RecordBatchEntry::new(record_batch, unsafe {
transmute::<OptionRecordRef<'_, R::Ref<'_>>, OptionRecordRef<'static, R::Ref<'static>>>(
record,
)
});
self.offset += 1;
Some(entry)
}
}
#[cfg(test)]
mod tests {}