re_arrow_combinators/selector/
mod.rs1mod lexer;
9mod parser;
10mod runtime;
11
12use arrow::{array::ListArray, datatypes::DataType};
13use vec1::Vec1;
14
15use parser::{Expr, Segment};
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub struct Selector(Expr);
20
21impl std::fmt::Display for Selector {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 write!(f, "{}", self.0)
24 }
25}
26
27impl Selector {
28 pub fn execute_per_row(&self, source: &ListArray) -> Result<ListArray, Error> {
34 runtime::execute_per_row(&self.0, source).map_err(Into::into)
35 }
36}
37
38impl std::str::FromStr for Selector {
39 type Err = Error;
40
41 fn from_str(query: &str) -> Result<Self, Self::Err> {
42 let lexer = lexer::Lexer::new(query);
44 let tokens = lexer.scan_tokens()?;
45
46 let parser = parser::Parser::new(tokens.into_iter());
47 let expr = parser.parse()?;
48
49 Ok(Self(expr))
50 }
51}
52
53impl crate::Transform for Selector {
54 type Source = ListArray;
55 type Target = ListArray;
56
57 fn transform(&self, source: &Self::Source) -> Result<Self::Target, crate::Error> {
58 self.execute_per_row(source).map_err(Into::into)
59 }
60}
61
62impl crate::Transform for &Selector {
63 type Source = ListArray;
64 type Target = ListArray;
65
66 fn transform(&self, source: &Self::Source) -> Result<Self::Target, crate::Error> {
67 self.execute_per_row(source).map_err(Into::into)
68 }
69}
70
71#[derive(Debug, thiserror::Error, Clone)]
73pub enum Error {
74 #[error(transparent)]
76 Lex(#[from] lexer::Error),
77
78 #[error(transparent)]
80 Parse(#[from] parser::Error),
81
82 #[error(transparent)]
84 Runtime(#[from] crate::Error),
85}
86
87pub fn extract_nested_fields<P>(
91 datatype: &DataType,
92 predicate: P,
93) -> Option<Vec1<(Selector, DataType)>>
94where
95 P: Fn(&DataType) -> bool,
96{
97 let DataType::Struct(fields) = datatype else {
98 return None;
99 };
100
101 let mut result = Vec::new();
102 let mut queue = std::collections::VecDeque::new();
103
104 queue.push_back((Vec::new(), fields));
106
107 while let Some((path, fields)) = queue.pop_front() {
109 for field in fields {
110 let mut field_path = path.clone();
111 field_path.push(Segment::Field(field.name().clone()));
112
113 match field.data_type() {
114 DataType::Struct(nested_fields) => {
115 queue.push_back((field_path, nested_fields));
117 }
118 DataType::List(inner) => {
119 field_path.push(Segment::Each);
121
122 match inner.data_type() {
123 DataType::Struct(nested_fields) => {
124 queue.push_back((field_path, nested_fields));
126 }
127 dt if predicate(dt) => {
128 result.push((Selector(Expr::Path(field_path)), dt.clone()));
130 }
131 _ => {}
132 }
133 }
134 dt if predicate(dt) => {
135 result.push((Selector(Expr::Path(field_path)), dt.clone()));
137 }
138 _ => {}
139 }
140 }
141 }
142
143 Vec1::try_from_vec(result).ok()
144}