1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
//! # Execution Plan Validation
//!
//! This module provides validation of execution plans against schemas,
//! ensuring type safety and preventing runtime errors.
use super::compatibility::are_join_compatible;
use super::core::SchemaValidator;
use crate::distributed::execution::{ExecutionPlan, Operation};
use crate::distributed::expr::{ColumnProjection, ExprSchema, ExprValidator};
use crate::error::{Error, Result};
impl SchemaValidator {
/// Validates an execution plan
pub fn validate_plan(&self, plan: &ExecutionPlan) -> Result<()> {
// Get schemas for input datasets
let mut input_schemas = Vec::new();
for input in plan.inputs() {
if let Some(schema) = self.schema(input) {
input_schemas.push(schema);
} else {
return Err(Error::InvalidOperation(format!(
"Schema not found for input dataset: {}",
input
)));
}
}
if input_schemas.is_empty() {
return Err(Error::InvalidOperation(
"No input schemas available for validation".to_string(),
));
}
// Validate operation against schemas
match plan.operations().first() {
None => return Err(Error::InvalidOperation("No operations in plan".to_string())),
Some(operation) => match operation {
Operation::Select(columns) => self.validate_select(input_schemas[0], &columns),
Operation::Filter(predicate) => self.validate_filter(input_schemas[0], &predicate),
Operation::Join {
join_type,
left_keys,
right_keys,
..
} => {
if input_schemas.len() < 2 {
return Err(Error::InvalidOperation(
"Join operation requires at least two input schemas".to_string(),
));
}
self.validate_join(input_schemas[0], input_schemas[1], &left_keys, &right_keys)
}
Operation::GroupBy { keys, aggregates } => {
self.validate_groupby(input_schemas[0], keys, aggregates)
}
Operation::OrderBy(sort_exprs) => {
self.validate_orderby(input_schemas[0], &sort_exprs)
}
Operation::Window(window_functions) => {
self.validate_window(input_schemas[0], &window_functions)
}
Operation::Custom { name, params } => {
match name.as_str() {
"select_expr" => {
if let Some(projections_json) = params.get("projections") {
// Parse projections from JSON
let projections: Vec<ColumnProjection> =
serde_json::from_str(projections_json).map_err(|e| {
Error::DistributedProcessing(format!(
"Failed to parse projections: {}",
e
))
})?;
self.validate_select_expr(input_schemas[0], &projections)
} else {
Err(Error::InvalidOperation(
"select_expr operation requires projections parameter"
.to_string(),
))
}
}
"with_column" => {
let column_name = params.get("column_name").ok_or_else(|| {
Error::InvalidOperation(
"with_column operation requires column_name parameter"
.to_string(),
)
})?;
if let Some(projection_json) = params.get("projection") {
let projection: ColumnProjection =
serde_json::from_str(projection_json).map_err(|e| {
Error::DistributedProcessing(format!(
"Failed to parse projection: {}",
e
))
})?;
self.validate_with_column(
input_schemas[0],
column_name,
&projection,
)
} else {
Err(Error::InvalidOperation(
"with_column operation requires projection parameter"
.to_string(),
))
}
}
"create_udf" => {
// UDF creation doesn't require schema validation
Ok(())
}
_ => {
// Unknown custom operation
Err(Error::NotImplemented(format!(
"Schema validation for custom operation '{}' is not implemented",
name
)))
}
}
}
Operation::Limit { .. } => {
// Limit doesn't require schema validation
Ok(())
}
_ => {
// For other operations, just return Ok for now
Ok(())
}
},
}
}
/// Validates a SELECT operation
fn validate_select(&self, schema: &ExprSchema, columns: &[String]) -> Result<()> {
for column in columns {
if !schema.has_column(column) {
return Err(Error::InvalidOperation(format!(
"Column not found in schema: {}",
column
)));
}
}
Ok(())
}
/// Validates a SELECT_EXPR operation
fn validate_select_expr(
&self,
schema: &ExprSchema,
projections: &[ColumnProjection],
) -> Result<()> {
let validator = ExprValidator::new(schema);
validator.validate_projections(projections)?;
Ok(())
}
/// Validates a WITH_COLUMN operation
fn validate_with_column(
&self,
schema: &ExprSchema,
column_name: &str,
projection: &ColumnProjection,
) -> Result<()> {
let validator = ExprValidator::new(schema);
validator.validate_expr(&projection.expr)?;
Ok(())
}
/// Validates a FILTER operation
fn validate_filter(&self, schema: &ExprSchema, predicate: &str) -> Result<()> {
// For a simple implementation, we'll just check if it's a valid SQL predicate
// In a more advanced implementation, we'd parse the predicate into an Expr
// and validate it against the schema
// Placeholder for SQL predicate validation
// This is simplified, but can be enhanced with a proper SQL parser
if predicate.is_empty() {
return Err(Error::InvalidOperation(
"Empty predicate in filter operation".to_string(),
));
}
// Basic check for balanced parentheses
let mut paren_count = 0;
for c in predicate.chars() {
if c == '(' {
paren_count += 1;
} else if c == ')' {
paren_count -= 1;
if paren_count < 0 {
return Err(Error::InvalidOperation(format!(
"Unbalanced parentheses in predicate: {}",
predicate
)));
}
}
}
if paren_count != 0 {
return Err(Error::InvalidOperation(format!(
"Unbalanced parentheses in predicate: {}",
predicate
)));
}
Ok(())
}
/// Validates a JOIN operation
fn validate_join(
&self,
left_schema: &ExprSchema,
right_schema: &ExprSchema,
left_keys: &[String],
right_keys: &[String],
) -> Result<()> {
if left_keys.len() != right_keys.len() {
return Err(Error::InvalidOperation(format!(
"Number of left keys ({}) does not match number of right keys ({})",
left_keys.len(),
right_keys.len()
)));
}
for (left_key, right_key) in left_keys.iter().zip(right_keys.iter()) {
// Check that keys exist in schemas
let left_col = left_schema.column(left_key).ok_or_else(|| {
Error::InvalidOperation(format!("Left join key not found in schema: {}", left_key))
})?;
let right_col = right_schema.column(right_key).ok_or_else(|| {
Error::InvalidOperation(format!(
"Right join key not found in schema: {}",
right_key
))
})?;
// Check that keys have compatible types
if !are_join_compatible(&left_col.data_type, &right_col.data_type) {
return Err(Error::InvalidOperation(format!(
"Incompatible join key types: {:?} and {:?}",
left_col.data_type, right_col.data_type
)));
}
}
Ok(())
}
/// Validates a GROUP BY operation
fn validate_groupby(
&self,
schema: &ExprSchema,
keys: &[String],
aggregates: &[crate::distributed::execution::AggregateExpr],
) -> Result<()> {
// Check that keys exist in schema
for key in keys {
if !schema.has_column(key) {
return Err(Error::InvalidOperation(format!(
"Grouping key not found in schema: {}",
key
)));
}
}
// Check that aggregated columns exist in schema
for agg in aggregates {
if !schema.has_column(&agg.column) {
return Err(Error::InvalidOperation(format!(
"Aggregated column not found in schema: {}",
agg.column
)));
}
// Check that aggregation function is valid for column type
let col = schema
.column(&agg.column)
.expect("operation should succeed");
match agg.function.as_str() {
"min" | "max" | "sum" | "avg" => {
// These functions require numeric or date columns
match col.data_type {
crate::distributed::expr::ExprDataType::Integer
| crate::distributed::expr::ExprDataType::Float
| crate::distributed::expr::ExprDataType::Date
| crate::distributed::expr::ExprDataType::Timestamp => {
// Valid types for these aggregations
}
_ => {
return Err(Error::InvalidOperation(format!(
"Aggregation function '{}' not supported for column type {:?}",
agg.function, col.data_type
)));
}
}
}
"count" => {
// Count can be applied to any column
}
_ => {
// Unknown aggregation function
return Err(Error::InvalidOperation(format!(
"Unknown aggregation function: {}",
agg.function
)));
}
}
}
Ok(())
}
/// Validates an ORDER BY operation
fn validate_orderby(
&self,
schema: &ExprSchema,
sort_exprs: &[crate::distributed::execution::SortExpr],
) -> Result<()> {
// Check that sort columns exist in schema
for sort_expr in sort_exprs {
if !schema.has_column(&sort_expr.column) {
return Err(Error::InvalidOperation(format!(
"Sort column not found in schema: {}",
sort_expr.column
)));
}
}
Ok(())
}
/// Validates a WINDOW operation (temporarily disabled - window module not enabled)
#[allow(dead_code)]
fn validate_window(
&self,
_schema: &ExprSchema,
_window_functions: &[String], // TODO: Replace with WindowFunction when window module is enabled
) -> Result<()> {
// TODO: Implement window function validation when window module is enabled
Ok(())
}
}