mod lexer;
mod parser;
mod runtime;
use arrow::{
array::ListArray,
datatypes::{DataType, Fields},
};
use vec1::Vec1;
use parser::{Expr, Segment, SegmentKind};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Selector(Expr);
impl std::fmt::Display for Selector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Selector {
pub fn parse(query: &str) -> Result<Self, Error> {
query.parse()
}
pub fn execute_per_row(&self, source: &ListArray) -> Result<Option<ListArray>, Error> {
runtime::execute_per_row(&self.0, source).map_err(Into::into)
}
}
impl std::str::FromStr for Selector {
type Err = Error;
fn from_str(query: &str) -> Result<Self, Self::Err> {
let lexer = lexer::Lexer::new(query);
let tokens = lexer.scan_tokens()?;
let parser = parser::Parser::new(tokens.into_iter());
let expr = parser.parse()?;
Ok(Self(expr))
}
}
impl crate::Transform for Selector {
type Source = ListArray;
type Target = ListArray;
fn transform(&self, source: &Self::Source) -> Result<Option<Self::Target>, crate::Error> {
self.execute_per_row(source).map_err(crate::Error::from)
}
}
impl crate::Transform for &Selector {
type Source = ListArray;
type Target = ListArray;
fn transform(&self, source: &Self::Source) -> Result<Option<Self::Target>, crate::Error> {
self.execute_per_row(source).map_err(crate::Error::from)
}
}
#[derive(Debug, thiserror::Error, Clone)]
pub enum Error {
#[error(transparent)]
Lex(#[from] lexer::Error),
#[error(transparent)]
Parse(#[from] parser::Error),
#[error(transparent)]
Runtime(#[from] crate::Error),
}
fn process_datatype<'a, P>(
mut path: Vec<Segment>,
datatype: &'a DataType,
predicate: &P,
result: &mut Vec<(Selector, DataType)>,
queue: &mut std::collections::VecDeque<(Vec<Segment>, &'a Fields)>,
) where
P: Fn(&DataType) -> bool,
{
match datatype {
dt if predicate(dt) => {
result.push((Selector(Expr::Path(path)), dt.clone()));
}
DataType::Struct(fields) => {
queue.push_back((path, fields));
}
DataType::List(inner) | DataType::FixedSizeList(inner, ..) => {
path.push(Segment {
kind: SegmentKind::Each,
suppressed: false,
assert_non_null: false,
});
match inner.data_type() {
dt if predicate(dt) => {
result.push((Selector(Expr::Path(path)), dt.clone()));
}
DataType::Struct(nested_fields) => {
queue.push_back((path, nested_fields));
}
DataType::FixedSizeList(field, ..) => {
let dt = field.data_type();
if predicate(dt) {
path.push(Segment {
kind: SegmentKind::Each,
suppressed: false,
assert_non_null: false,
});
result.push((Selector(Expr::Path(path)), dt.clone()));
}
}
_ => {}
}
}
_ => {}
}
}
pub fn extract_nested_fields<P>(
datatype: &DataType,
predicate: P,
) -> Option<Vec1<(Selector, DataType)>>
where
P: Fn(&DataType) -> bool,
{
let mut result = Vec::new();
let mut queue = std::collections::VecDeque::new();
match datatype {
DataType::Struct(_) | DataType::List(_) | DataType::FixedSizeList(..) => {
process_datatype(Vec::new(), datatype, &predicate, &mut result, &mut queue);
}
_ => return None,
}
while let Some((path, fields)) = queue.pop_front() {
for field in fields {
let mut field_path = path.clone();
field_path.push(Segment {
kind: SegmentKind::Field(field.name().clone()),
suppressed: false,
assert_non_null: false,
});
process_datatype(
field_path,
field.data_type(),
&predicate,
&mut result,
&mut queue,
);
}
}
Vec1::try_from_vec(result).ok()
}