use arrow::array::{Array as _, FixedSizeListArray, ListArray};
use crate::{
Transform as _,
index::GetIndexList,
map::MapList,
reshape::{Flatten, GetField, PromoteInnerNulls},
};
use super::parser::{Expr, Segment, SegmentKind};
pub fn execute_per_row(expr: &Expr, source: &ListArray) -> Result<Option<ListArray>, crate::Error> {
let result = expr.execute(source)?;
if let Some(ref result) = result {
re_log::debug_assert_eq!(
result.len(),
source.len(),
"selectors should never change row count"
);
}
Ok(result)
}
fn values_downcasts_to<T: 'static>(array: &ListArray) -> bool {
array.values().as_any().downcast_ref::<T>().is_some()
}
impl SegmentKind {
fn execute(&self, source: &ListArray) -> Result<Option<ListArray>, crate::Error> {
match self {
Self::Field(field_name) => {
MapList::new(GetField::new(field_name.clone())).transform(source)
}
Self::Index(index) => MapList::new(GetIndexList::new(*index)).transform(source),
Self::Each => {
if values_downcasts_to::<ListArray>(source)
|| values_downcasts_to::<FixedSizeListArray>(source)
{
Flatten::new().transform(source)
} else {
Err(crate::Error::TypeMismatch {
expected: "ListArray".into(),
actual: source.value_type(),
context: "Each ([]) operator requires nested lists".into(),
})
}
}
}
}
}
impl Segment {
fn execute(&self, source: &ListArray) -> Result<Option<ListArray>, crate::Error> {
let result = match self.kind.execute(source) {
Ok(result) => result,
Err(err) if self.suppressed => {
re_log::trace!("Suppressed segment `{self}` suppressed error: {err}");
return Ok(None);
}
Err(err) => return Err(err),
};
let Some(result) = result else {
return Ok(None);
};
if self.assert_non_null {
PromoteInnerNulls.transform(&result)
} else {
Ok(Some(result))
}
}
}
impl Expr {
fn execute(&self, source: &ListArray) -> Result<Option<ListArray>, crate::Error> {
match self {
Self::Identity => Ok(Some(source.clone())),
Self::Path(segments) => {
let mut result = source.clone();
for segment in segments {
match segment.execute(&result)? {
Some(next) => result = next,
None => return Ok(None),
}
}
Ok(Some(result))
}
Self::Pipe(left, right) => match left.as_ref().execute(source)? {
Some(intermediate) => right.as_ref().execute(&intermediate),
None => Ok(None),
},
}
}
}