use arrow::array::ArrayRef;
use arrow::datatypes::DataType;
use crate::datatype::{InfallibleBuild, PrimitiveType, RefType};
use crate::typed_array::TypedArray;
use crate::{ColumnError, Error, ErrorKind, LogicalType};
pub struct Column<L: LogicalType> {
array: TypedArray<L>,
metadata: std::collections::BTreeMap<String, String>,
}
impl<L: LogicalType> Column<L> {
pub const NULLABLE: bool = L::NULLABLE;
pub fn try_new(array: ArrayRef) -> Result<Self, ColumnError> {
Ok(Self {
array: TypedArray::try_new(array)?,
metadata: std::collections::BTreeMap::new(),
})
}
pub fn from_record_batch_and_name(
batch: &arrow::record_batch::RecordBatch,
name: &str,
) -> Result<Self, Error> {
Self::extract_named(batch, name, "Column")
}
pub(crate) fn extract_named(
batch: &arrow::record_batch::RecordBatch,
name: &str,
record_type: &'static str,
) -> Result<Self, Error> {
let (index, field) = batch
.schema_ref()
.column_with_name(name)
.ok_or_else(|| Error {
record_type,
kind: ErrorKind::MissingColumn {
column: name.to_owned(),
},
})?;
let column = Self::try_new(ArrayRef::clone(batch.column(index))).map_err(|err| Error {
record_type,
kind: err.for_column(name.to_owned()),
})?;
Ok(column.with_metadata(
field
.metadata()
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect(),
))
}
#[must_use]
pub fn metadata(&self) -> &std::collections::BTreeMap<String, String> {
&self.metadata
}
pub fn metadata_mut(&mut self) -> &mut std::collections::BTreeMap<String, String> {
&mut self.metadata
}
#[must_use]
pub fn with_metadata(mut self, metadata: std::collections::BTreeMap<String, String>) -> Self {
self.metadata = metadata;
self
}
#[must_use]
pub fn len(&self) -> usize {
self.array.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.array.is_empty()
}
#[must_use]
#[inline]
pub fn get(&self, index: usize) -> Option<L::Value<'_>> {
self.array.get(index)
}
#[must_use]
pub fn get_owned(&self, index: usize) -> Option<L::Owned> {
self.get(index).map(L::to_owned_value)
}
#[must_use]
#[inline]
pub fn value(&self, index: usize) -> L::Value<'_> {
self.array.value(index)
}
#[must_use]
pub fn value_owned(&self, index: usize) -> L::Owned {
L::to_owned_value(self.value(index))
}
#[must_use]
pub fn iter(&self) -> ColumnIter<'_, L> {
ColumnIter {
column: self,
index: 0,
end: self.len(),
}
}
pub fn iter_owned(&self) -> impl Iterator<Item = L::Owned> + '_ {
self.iter().map(L::to_owned_value)
}
#[must_use]
pub fn into_iter_owned(self) -> ColumnIntoIter<L> {
let end = self.len();
ColumnIntoIter {
column: self,
index: 0,
end,
}
}
#[must_use]
pub fn to_vec(&self) -> Vec<L::Owned> {
self.iter_owned().collect()
}
#[must_use]
pub fn slice(&self, offset: usize, length: usize) -> Self {
Self::try_new(self.array.as_arrow().slice(offset, length))
.expect("Cannot fail: slicing preserves datatype and validity")
.with_metadata(self.metadata.clone())
}
#[must_use]
pub fn as_arrow(&self) -> &ArrayRef {
self.array.as_arrow()
}
#[must_use]
pub fn into_arrow(self) -> ArrayRef {
self.array.into_arrow()
}
}
impl<L: crate::ConcreteType> Column<L> {
pub fn try_from_values(
values: impl IntoIterator<Item = impl Into<L::Owned>>,
) -> Result<Self, ColumnError> {
let array = L::build(values.into_iter().map(|value| Some(value.into())))?;
Self::try_new(array)
}
#[must_use]
pub fn datatype() -> DataType {
L::datatype()
}
}
impl<L: RefType> std::ops::Index<usize> for Column<L> {
type Output = L::Ref;
fn index(&self, index: usize) -> &Self::Output {
self.array.value_ref(index)
}
}
impl<L: PrimitiveType> Column<L> {
#[must_use]
pub fn as_slice(&self) -> &[L::Native] {
self.array.values()
}
}
impl<L: LogicalType> Column<L>
where
L: InfallibleBuild,
{
pub fn from_values(values: impl IntoIterator<Item = impl Into<L::Owned>>) -> Self {
Self::try_from_values(values).expect("Cannot fail: the logical type is InfallibleBuild")
}
}
impl<L: crate::ConcreteType> Column<Option<L>> {
pub fn try_from_nullable_values(
values: impl IntoIterator<Item = Option<impl Into<L::Owned>>>,
) -> Result<Self, ColumnError> {
Self::try_from_values(values.into_iter().map(|value| value.map(Into::into)))
}
}
impl<L: InfallibleBuild> Column<Option<L>> {
pub fn from_nullable_values(
values: impl IntoIterator<Item = Option<impl Into<L::Owned>>>,
) -> Self {
Self::from_values(values.into_iter().map(|value| value.map(Into::into)))
}
}
impl<L: InfallibleBuild, T: Into<L::Owned>> From<Vec<T>> for Column<L> {
fn from(values: Vec<T>) -> Self {
Self::from_values(values)
}
}
impl<L: InfallibleBuild, T: Into<L::Owned>> FromIterator<T> for Column<L> {
fn from_iter<I: IntoIterator<Item = T>>(values: I) -> Self {
Self::from_values(values)
}
}
impl<L: crate::ConcreteType> Default for Column<L> {
fn default() -> Self {
let array = arrow::array::new_empty_array(&L::datatype());
Self::try_new(array).expect("An empty array of the right datatype is always valid")
}
}
impl<L: LogicalType> PartialEq for Column<L> {
fn eq(&self, other: &Self) -> bool {
self.metadata == other.metadata && self.array == other.array
}
}
impl<L: LogicalType> Clone for Column<L> {
fn clone(&self) -> Self {
Self {
array: self.array.clone(),
metadata: self.metadata.clone(),
}
}
}
impl<L: LogicalType> std::fmt::Debug for Column<L> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Column")
.field("array", self.array.as_arrow())
.field("metadata", &self.metadata)
.finish_non_exhaustive()
}
}
impl<L: LogicalType> TryFrom<ArrayRef> for Column<L> {
type Error = ColumnError;
fn try_from(array: ArrayRef) -> Result<Self, Self::Error> {
Self::try_new(array)
}
}
pub struct ColumnIter<'a, L: LogicalType> {
column: &'a Column<L>,
index: usize,
end: usize,
}
impl<'a, L: LogicalType + 'a> Iterator for ColumnIter<'a, L> {
type Item = L::Value<'a>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.end {
let value = unsafe { self.column.array.value_unchecked(self.index) };
self.index += 1;
Some(value)
} else {
None
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.end - self.index;
(remaining, Some(remaining))
}
fn count(self) -> usize {
self.end - self.index
}
fn last(self) -> Option<Self::Item> {
(self.index < self.end).then(|| unsafe { self.column.array.value_unchecked(self.end - 1) })
}
fn nth(&mut self, n: usize) -> Option<Self::Item> {
match self.index.checked_add(n) {
Some(target) if target < self.end => {
self.index = target + 1;
Some(unsafe { self.column.array.value_unchecked(target) })
}
_ => {
self.index = self.end;
None
}
}
}
fn fold<B, F>(self, init: B, mut f: F) -> B
where
F: FnMut(B, Self::Item) -> B,
{
let Self { column, index, end } = self;
let mut acc = init;
for i in index..end {
acc = f(acc, unsafe { column.array.value_unchecked(i) });
}
acc
}
}
impl<'a, L: LogicalType + 'a> DoubleEndedIterator for ColumnIter<'a, L> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.index < self.end {
self.end -= 1;
Some(unsafe { self.column.array.value_unchecked(self.end) })
} else {
None
}
}
}
impl<'a, L: LogicalType + 'a> ExactSizeIterator for ColumnIter<'a, L> {}
impl<'a, L: LogicalType + 'a> std::iter::FusedIterator for ColumnIter<'a, L> {}
impl<'a, L: LogicalType + 'a> IntoIterator for &'a Column<L> {
type Item = L::Value<'a>;
type IntoIter = ColumnIter<'a, L>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
pub struct ColumnIntoIter<L: LogicalType> {
column: Column<L>,
index: usize,
end: usize,
}
impl<L: LogicalType> Iterator for ColumnIntoIter<L> {
type Item = L::Owned;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.end {
let value = unsafe { self.column.array.value_unchecked(self.index) };
let value = L::to_owned_value(value);
self.index += 1;
Some(value)
} else {
None
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.end - self.index;
(remaining, Some(remaining))
}
fn nth(&mut self, n: usize) -> Option<Self::Item> {
match self.index.checked_add(n) {
Some(target) if target < self.end => {
self.index = target + 1;
Some(L::to_owned_value(unsafe {
self.column.array.value_unchecked(target)
}))
}
_ => {
self.index = self.end;
None
}
}
}
}
impl<L: LogicalType> DoubleEndedIterator for ColumnIntoIter<L> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.index < self.end {
self.end -= 1;
Some(L::to_owned_value(unsafe {
self.column.array.value_unchecked(self.end)
}))
} else {
None
}
}
}
impl<L: LogicalType> ExactSizeIterator for ColumnIntoIter<L> {}
impl<L: LogicalType> std::iter::FusedIterator for ColumnIntoIter<L> {}