1use anyhow::{Result, anyhow};
2
3use crate::{
4 data::{Value, parse_typed_value},
5 schema::{ColumnType, Schema},
6};
7
8#[derive(Debug, Clone, Copy)]
9pub enum ComparisonOperator {
10 Eq,
11 NotEq,
12 Gt,
13 Ge,
14 Lt,
15 Le,
16 Contains,
17 StartsWith,
18 EndsWith,
19}
20
21#[derive(Debug, Clone)]
22pub struct FilterCondition {
23 pub column: String,
24 pub operator: ComparisonOperator,
25 pub raw_value: String,
26}
27
28pub fn parse_filters(filters: &[String]) -> Result<Vec<FilterCondition>> {
29 filters.iter().map(|f| parse_filter(f)).collect()
30}
31
32fn parse_filter(filter: &str) -> Result<FilterCondition> {
33 let trimmed = filter.trim();
34 if trimmed.is_empty() {
35 return Err(anyhow!("Empty filter expression"));
36 }
37
38 let lowered = trimmed.to_ascii_lowercase();
39 for (needle, op) in [
40 (" contains ", ComparisonOperator::Contains),
41 (" startswith ", ComparisonOperator::StartsWith),
42 (" endswith ", ComparisonOperator::EndsWith),
43 ] {
44 if let Some(idx) = lowered.find(needle) {
45 let (left, right_with_space) = trimmed.split_at(idx);
46 let right = right_with_space[needle.len()..].trim();
47 return Ok(FilterCondition {
48 column: left.trim().to_string(),
49 operator: op,
50 raw_value: unquote(right)?.to_string(),
51 });
52 }
53 }
54
55 for needle in ["!=", ">=", "<=", "=", ">", "<"] {
56 if let Some(idx) = trimmed.find(needle) {
57 let op = match needle {
58 "=" => ComparisonOperator::Eq,
59 "!=" => ComparisonOperator::NotEq,
60 ">" => ComparisonOperator::Gt,
61 ">=" => ComparisonOperator::Ge,
62 "<" => ComparisonOperator::Lt,
63 "<=" => ComparisonOperator::Le,
64 _ => unreachable!(),
65 };
66 let left = trimmed[..idx].trim();
67 let right = trimmed[idx + needle.len()..].trim();
68 return Ok(FilterCondition {
69 column: left.to_string(),
70 operator: op,
71 raw_value: unquote(right)?.to_string(),
72 });
73 }
74 }
75
76 Err(anyhow!("Failed to parse filter expression '{trimmed}'"))
77}
78
79fn unquote(value: &str) -> Result<&str> {
80 if value.len() >= 2 {
81 let bytes = value.as_bytes();
82 if (bytes[0] == b'"' && bytes[value.len() - 1] == b'"')
83 || (bytes[0] == b'\'' && bytes[value.len() - 1] == b'\'')
84 {
85 return Ok(&value[1..value.len() - 1]);
86 }
87 }
88 Ok(value)
89}
90
91pub fn evaluate_conditions(
92 conditions: &[FilterCondition],
93 schema: &Schema,
94 headers: &[String],
95 raw_row: &[String],
96 typed_row: &[Option<Value>],
97) -> Result<bool> {
98 for condition in conditions {
99 let Some(col_index) = schema.column_index(&condition.column).or_else(|| {
100 headers
101 .iter()
102 .position(|header| header == &condition.column)
103 }) else {
104 return Err(anyhow!(
105 "Column '{}' not found for filter",
106 condition.column
107 ));
108 };
109 let column_type = schema
110 .columns
111 .get(col_index)
112 .map(|c| c.datatype.clone())
113 .unwrap_or(ColumnType::String);
114 if !evaluate_condition(
115 condition,
116 &column_type,
117 raw_row.get(col_index).map(|s| s.as_str()),
118 typed_row.get(col_index).and_then(|v| v.as_ref()),
119 )? {
120 return Ok(false);
121 }
122 }
123 Ok(true)
124}
125
126fn evaluate_condition(
127 condition: &FilterCondition,
128 column_type: &ColumnType,
129 raw_value: Option<&str>,
130 typed_value: Option<&Value>,
131) -> Result<bool> {
132 let candidate_typed = match typed_value {
133 Some(value) => Some(value.clone()),
134 None => {
135 if let Some(raw) = raw_value {
136 parse_typed_value(raw, column_type)?
137 } else {
138 None
139 }
140 }
141 };
142
143 use ComparisonOperator::*;
144 match condition.operator {
145 Contains | StartsWith | EndsWith => {
146 let raw = raw_value.unwrap_or("");
147 let needle = condition.raw_value.as_str();
148 let cmp = match condition.operator {
149 Contains => raw.contains(needle),
150 StartsWith => raw.starts_with(needle),
151 EndsWith => raw.ends_with(needle),
152 _ => unreachable!(),
153 };
154 Ok(cmp)
155 }
156 Eq | NotEq | Gt | Ge | Lt | Le => {
157 let rhs_value = parse_typed_value(&condition.raw_value, column_type)?;
158 match (candidate_typed, rhs_value) {
159 (Some(left), Some(right)) => match condition.operator {
160 Eq => Ok(left == right),
161 NotEq => Ok(left != right),
162 Gt => Ok(left > right),
163 Ge => Ok(left >= right),
164 Lt => Ok(left < right),
165 Le => Ok(left <= right),
166 _ => unreachable!(),
167 },
168 (None, None) => Ok(matches!(condition.operator, Eq | Ge | Le)),
169 (None, Some(_)) => Ok(matches!(condition.operator, NotEq)),
170 (Some(_), None) => Ok(matches!(condition.operator, NotEq)),
171 }
172 }
173 }
174}