data_transform/parser/
mod.rs

1pub mod ast;
2
3use pest::Parser;
4use pest_derive::Parser;
5
6use ast::*;
7use crate::error::{DtransformError, Result};
8
9#[derive(Parser)]
10#[grammar = "parser/grammar.pest"]
11pub struct DtransformParser;
12
13// Parse a multi-statement program (for files/CLI)
14pub fn parse_program(input: &str) -> Result<Program> {
15    let pairs = DtransformParser::parse(Rule::program, input)
16        .map_err(|e| DtransformError::PestError(e.to_string()))?;
17
18    let program_pair = pairs.into_iter().next().unwrap();
19    parse_program_inner(program_pair)
20}
21
22// Parse a single statement (for REPL)
23pub fn parse(input: &str) -> Result<Statement> {
24    let pairs = DtransformParser::parse(Rule::statement, input)
25        .map_err(|e| DtransformError::PestError(e.to_string()))?;
26
27    let statement_pair = pairs.into_iter().next().unwrap();
28    parse_statement(statement_pair)
29}
30
31fn parse_program_inner(pair: pest::iterators::Pair<Rule>) -> Result<Program> {
32    let mut statements = Vec::new();
33
34    for inner in pair.into_inner() {
35        if inner.as_rule() == Rule::statement_inner {
36            statements.push(parse_statement_inner(inner)?);
37        }
38    }
39
40    Ok(Program { statements })
41}
42
43fn parse_statement(pair: pest::iterators::Pair<Rule>) -> Result<Statement> {
44    let inner = pair.into_inner().next().unwrap();
45    parse_statement_inner(inner)
46}
47
48fn parse_statement_inner(pair: pest::iterators::Pair<Rule>) -> Result<Statement> {
49    let inner = pair.into_inner().next().unwrap();
50
51    match inner.as_rule() {
52        Rule::assignment => {
53            let mut inner_pairs = inner.into_inner();
54            let name = inner_pairs.next().unwrap().as_str().to_string();
55            let pipeline = parse_pipeline(inner_pairs.next().unwrap())?;
56            Ok(Statement::Assignment { name, pipeline })
57        }
58        Rule::pipeline => {
59            let pipeline = parse_pipeline(inner)?;
60            Ok(Statement::Pipeline(pipeline))
61        }
62        _ => Err(DtransformError::ParseError(format!("Unexpected rule: {:?}", inner.as_rule())))
63    }
64}
65
66fn parse_pipeline(pair: pest::iterators::Pair<Rule>) -> Result<Pipeline> {
67    let mut operations = Vec::new();
68    let mut source = None;
69
70    for inner_pair in pair.into_inner() {
71        match inner_pair.as_rule() {
72            Rule::operation => {
73                operations.push(parse_operation(inner_pair)?);
74            }
75            _ => {}
76        }
77    }
78
79    // Extract source from first operation if it's a read or variable
80    if !operations.is_empty() {
81        match &operations[0] {
82            Operation::Read(read_op) => {
83                source = Some(Source::Read(read_op.clone()));
84                operations.remove(0);
85            }
86            Operation::Variable(var_name) => {
87                source = Some(Source::Variable(var_name.clone()));
88                operations.remove(0);
89            }
90            _ => {}
91        }
92    }
93
94    Ok(Pipeline { source, operations })
95}
96
97
98fn parse_operation(pair: pest::iterators::Pair<Rule>) -> Result<Operation> {
99    let inner = pair.into_inner().next().unwrap();
100
101    match inner.as_rule() {
102        Rule::read_op => Ok(Operation::Read(parse_read_op(inner)?)),
103        Rule::write_op => Ok(Operation::Write(parse_write_op(inner)?)),
104        Rule::select_op => Ok(Operation::Select(parse_select_op(inner)?)),
105        Rule::filter_op => Ok(Operation::Filter(parse_filter_op(inner)?)),
106        Rule::mutate_op => Ok(Operation::Mutate(parse_mutate_op(inner)?)),
107        Rule::rename_op => Ok(Operation::Rename(parse_rename_op(inner)?)),
108        Rule::rename_all_op => Ok(Operation::RenameAll(parse_rename_all_op(inner)?)),
109        Rule::sort_op => Ok(Operation::Sort(parse_sort_op(inner)?)),
110        Rule::take_op => Ok(Operation::Take(parse_take_op(inner)?)),
111        Rule::skip_op => Ok(Operation::Skip(parse_skip_op(inner)?)),
112        Rule::slice_op => Ok(Operation::Slice(parse_slice_op(inner)?)),
113        Rule::drop_op => Ok(Operation::Drop(parse_drop_op(inner)?)),
114        Rule::distinct_op => Ok(Operation::Distinct(parse_distinct_op(inner)?)),
115        Rule::variable_ref => {
116            // This is a variable reference used as a source
117            Ok(Operation::Variable(inner.as_str().trim().to_string()))
118        }
119        _ => Err(DtransformError::ParseError(format!("Unknown operation: {:?}", inner.as_rule())))
120    }
121}
122
123fn parse_read_op(pair: pest::iterators::Pair<Rule>) -> Result<ReadOp> {
124    let mut inner_pairs = pair.into_inner();
125    let path = parse_string(inner_pairs.next().unwrap())?;
126
127    let mut format = None;
128    let mut delimiter = None;
129    let mut header = None;
130    let mut skip_rows = None;
131    let mut trim_whitespace = None;
132
133    if let Some(params_pair) = inner_pairs.next() {
134        for param in params_pair.into_inner() {
135            let mut param_inner = param.into_inner();
136            let name = param_inner.next().unwrap().as_str();
137            let value = param_inner.next().unwrap();
138
139            match name {
140                "format" => format = Some(parse_param_value(value)?),
141                "delimiter" => {
142                    let delim_str = parse_param_value(value)?;
143                    delimiter = delim_str.chars().next();
144                }
145                "header" => {
146                    let header_str = parse_param_value(value)?;
147                    header = Some(header_str == "true");
148                }
149                "skip_rows" => {
150                    let skip_str = parse_param_value(value)?;
151                    skip_rows = Some(skip_str.parse::<usize>().map_err(|_| {
152                        DtransformError::ParseError(format!("Invalid skip_rows value: {}", skip_str))
153                    })?);
154                }
155                "trim_whitespace" => {
156                    let trim_str = parse_param_value(value)?;
157                    trim_whitespace = Some(trim_str == "true");
158                }
159                _ => {}
160            }
161        }
162    }
163
164    Ok(ReadOp { path, format, delimiter, header, skip_rows, trim_whitespace })
165}
166
167fn parse_write_op(pair: pest::iterators::Pair<Rule>) -> Result<WriteOp> {
168    let mut inner_pairs = pair.into_inner();
169    let path = parse_string(inner_pairs.next().unwrap())?;
170
171    let mut format = None;
172    let mut header = None;
173    let mut delimiter = None;
174
175    if let Some(params_pair) = inner_pairs.next() {
176        for param in params_pair.into_inner() {
177            let mut param_inner = param.into_inner();
178            let name = param_inner.next().unwrap().as_str();
179            let value = param_inner.next().unwrap();
180
181            match name {
182                "format" => format = Some(parse_param_value(value)?),
183                "header" => header = Some(parse_param_value(value)? == "true"),
184                "delimiter" => {
185                    let delim_str = parse_param_value(value)?;
186                    delimiter = delim_str.chars().next();
187                }
188                _ => {}
189            }
190        }
191    }
192
193    Ok(WriteOp { path, format, header, delimiter })
194}
195
196fn parse_select_op(pair: pest::iterators::Pair<Rule>) -> Result<SelectOp> {
197    let mut selectors = Vec::new();
198
199    for inner_pair in pair.into_inner() {
200        if inner_pair.as_rule() == Rule::selector_list {
201            for selector_item_pair in inner_pair.into_inner() {
202                let (selector, alias) = parse_selector_item(selector_item_pair)?;
203                selectors.push((selector, alias));
204            }
205        }
206    }
207
208    Ok(SelectOp { selectors })
209}
210
211fn parse_selector_item(pair: pest::iterators::Pair<Rule>) -> Result<(ColumnSelector, Option<String>)> {
212    let inner = pair.into_inner().next().unwrap();
213
214    match inner.as_rule() {
215        Rule::aliased_selector => {
216            let mut inner_pairs = inner.into_inner();
217            let first = inner_pairs.next().unwrap();
218            let second = inner_pairs.next().unwrap();
219
220            // Check if first token is a selector or identifier
221            // Grammar: selector ~ "as" ~ identifier | identifier ~ "=" ~ selector
222            match first.as_rule() {
223                Rule::selector => {
224                    // New syntax: selector as identifier
225                    let selector = parse_selector(first)?;
226                    let alias = second.as_str().to_string();
227                    Ok((selector, Some(alias)))
228                }
229                Rule::identifier => {
230                    // Old syntax: identifier = selector
231                    let alias = first.as_str().to_string();
232                    let selector = parse_selector(second)?;
233                    Ok((selector, Some(alias)))
234                }
235                _ => Err(DtransformError::ParseError("Invalid aliased selector".to_string()))
236            }
237        }
238        Rule::selector => {
239            let selector = parse_selector(inner)?;
240            Ok((selector, None))
241        }
242        _ => Err(DtransformError::ParseError("Invalid selector item".to_string()))
243    }
244}
245
246fn parse_selector(pair: pest::iterators::Pair<Rule>) -> Result<ColumnSelector> {
247    // If we have a selector wrapper, unwrap it
248    let actual_pair = if pair.as_rule() == Rule::selector {
249        pair.into_inner().next().unwrap()
250    } else {
251        pair
252    };
253
254    match actual_pair.as_rule() {
255        Rule::column_ref => {
256            let inner = actual_pair.into_inner().next().unwrap();
257            match inner.as_rule() {
258                Rule::positional_column => {
259                    // $1, $2, etc. - AWK-style (1-based)
260                    let text = inner.as_str();
261                    let num_str = &text[1..]; // Skip the '$'
262                    let position = parse_number_as_usize(num_str)?;
263                    if position == 0 {
264                        return Err(DtransformError::ParseError(
265                            "Positional columns start at $1, not $0".to_string()
266                        ));
267                    }
268                    // For selector, convert to 0-based index
269                    Ok(ColumnSelector::Index(position - 1))
270                }
271                Rule::identifier => Ok(ColumnSelector::Name(inner.as_str().to_string())),
272                _ => Err(DtransformError::ParseError("Invalid column reference".to_string()))
273            }
274        }
275        Rule::regex_selector => {
276            let pattern = parse_string(actual_pair.into_inner().next().unwrap())?;
277            Ok(ColumnSelector::Regex(pattern))
278        }
279        Rule::positional_range => {
280            let mut inner_pairs = actual_pair.into_inner();
281            let start_pair = inner_pairs.next().unwrap();
282            let end_pair = inner_pairs.next().unwrap();
283
284            // Parse $N format
285            let start_str = start_pair.as_str();
286            let start_num = parse_number_as_usize(&start_str[1..])?; // Skip '$'
287            if start_num == 0 {
288                return Err(DtransformError::ParseError(
289                    "Positional ranges start at $1, not $0".to_string()
290                ));
291            }
292
293            let end_str = end_pair.as_str();
294            let end_num = parse_number_as_usize(&end_str[1..])?; // Skip '$'
295            if end_num == 0 {
296                return Err(DtransformError::ParseError(
297                    "Positional ranges start at $1, not $0".to_string()
298                ));
299            }
300
301            // Convert to 0-based indices
302            Ok(ColumnSelector::Range(start_num - 1, end_num - 1))
303        }
304        Rule::type_selector => {
305            let mut types = Vec::new();
306            for type_pair in actual_pair.into_inner() {
307                if type_pair.as_rule() == Rule::type_list {
308                    for data_type_pair in type_pair.into_inner() {
309                        types.push(parse_data_type(data_type_pair)?);
310                    }
311                }
312            }
313            Ok(ColumnSelector::Type(types))
314        }
315        Rule::except_selector => {
316            let inner = actual_pair.into_inner().next().unwrap();
317            let selector = parse_selector(inner)?;
318            Ok(ColumnSelector::Except(Box::new(selector)))
319        }
320        _ => Err(DtransformError::ParseError(format!("Unknown selector: {:?}", actual_pair.as_rule())))
321    }
322}
323
324fn parse_data_type(pair: pest::iterators::Pair<Rule>) -> Result<DataType> {
325    match pair.as_str() {
326        "Number" => Ok(DataType::Number),
327        "String" => Ok(DataType::String),
328        "Boolean" => Ok(DataType::Boolean),
329        "Date" => Ok(DataType::Date),
330        "DateTime" => Ok(DataType::DateTime),
331        _ => Err(DtransformError::ParseError("Invalid data type".to_string()))
332    }
333}
334
335fn parse_filter_op(pair: pest::iterators::Pair<Rule>) -> Result<FilterOp> {
336    let condition = parse_expression(pair.into_inner().next().unwrap())?;
337    Ok(FilterOp { condition })
338}
339
340fn parse_mutate_op(pair: pest::iterators::Pair<Rule>) -> Result<MutateOp> {
341    let mut assignments = Vec::new();
342
343    for inner_pair in pair.into_inner() {
344        if inner_pair.as_rule() == Rule::assignment_list {
345            for assignment_pair in inner_pair.into_inner() {
346                assignments.push(parse_assignment(assignment_pair)?);
347            }
348        }
349    }
350
351    Ok(MutateOp { assignments })
352}
353
354fn parse_assignment(pair: pest::iterators::Pair<Rule>) -> Result<Assignment> {
355    use crate::parser::ast::AssignmentTarget;
356
357    let mut inner_pairs = pair.into_inner();
358    let column_pair = inner_pairs.next().unwrap();
359
360    let column = match column_pair.as_rule() {
361        Rule::identifier => AssignmentTarget::Name(column_pair.as_str().to_string()),
362        Rule::number => AssignmentTarget::Name(format!("col_{}", column_pair.as_str())),
363        Rule::positional_column => {
364            // $1, $2, etc.
365            let text = column_pair.as_str();
366            let num_str = &text[1..]; // Skip the '$'
367            let pos: usize = num_str.parse()
368                .map_err(|_| DtransformError::ParseError(format!("Invalid column number: {}", num_str)))?;
369
370            if pos == 0 {
371                return Err(DtransformError::ParseError(
372                    "Column positions must be 1-based (e.g., $1, $2, ...)".to_string()
373                ));
374            }
375
376            AssignmentTarget::Position(pos)
377        }
378        _ => return Err(DtransformError::ParseError("Invalid column in assignment".to_string()))
379    };
380
381    let expression = parse_expression(inner_pairs.next().unwrap())?;
382
383    Ok(Assignment { column, expression })
384}
385
386fn parse_rename_op(pair: pest::iterators::Pair<Rule>) -> Result<RenameOp> {
387    let mut mappings = Vec::new();
388
389    for inner_pair in pair.into_inner() {
390        if inner_pair.as_rule() == Rule::rename_mapping_list {
391            for mapping_pair in inner_pair.into_inner() {
392                let mut mapping_inner = mapping_pair.into_inner();
393                let col_ref = parse_column_ref(mapping_inner.next().unwrap())?;
394                let new_name_pair = mapping_inner.next().unwrap();
395
396                let new_name = match new_name_pair.as_rule() {
397                    Rule::identifier => new_name_pair.as_str().to_string(),
398                    Rule::string => parse_string(new_name_pair)?,
399                    _ => return Err(DtransformError::ParseError("Invalid new name in rename".to_string()))
400                };
401
402                mappings.push((col_ref, new_name));
403            }
404        }
405    }
406
407    Ok(RenameOp { mappings })
408}
409
410fn parse_rename_all_op(pair: pest::iterators::Pair<Rule>) -> Result<RenameAllOp> {
411    let strategy_pair = pair.into_inner().next().unwrap();
412    let strategy = parse_rename_strategy(strategy_pair)?;
413    Ok(RenameAllOp { strategy })
414}
415
416fn parse_rename_strategy(pair: pest::iterators::Pair<Rule>) -> Result<RenameStrategy> {
417    let inner = pair.into_inner().next().unwrap();
418
419    match inner.as_rule() {
420        Rule::replace_strategy => {
421            let mut inner_pairs = inner.into_inner();
422            let old = parse_string(inner_pairs.next().unwrap())?;
423            let new = parse_string(inner_pairs.next().unwrap())?;
424            Ok(RenameStrategy::Replace { old, new })
425        }
426        Rule::sequential_strategy => {
427            let mut inner_pairs = inner.into_inner();
428            let prefix = parse_string(inner_pairs.next().unwrap())?;
429            let start = parse_number_as_usize(inner_pairs.next().unwrap().as_str())?;
430            let end = parse_number_as_usize(inner_pairs.next().unwrap().as_str())?;
431            Ok(RenameStrategy::Sequential { prefix, start, end })
432        }
433        _ => Err(DtransformError::ParseError("Unknown rename strategy".to_string()))
434    }
435}
436
437fn parse_sort_op(pair: pest::iterators::Pair<Rule>) -> Result<SortOp> {
438    let mut columns = Vec::new();
439
440    for inner_pair in pair.into_inner() {
441        if inner_pair.as_rule() == Rule::sort_column_list {
442            for sort_col_pair in inner_pair.into_inner() {
443                let mut sort_col_inner = sort_col_pair.into_inner();
444                let col_ref = parse_column_ref(sort_col_inner.next().unwrap())?;
445
446                let descending = if let Some(order_pair) = sort_col_inner.next() {
447                    order_pair.as_str() == "desc"
448                } else {
449                    false
450                };
451
452                columns.push((col_ref, descending));
453            }
454        }
455    }
456
457    Ok(SortOp { columns })
458}
459
460fn parse_take_op(pair: pest::iterators::Pair<Rule>) -> Result<TakeOp> {
461    let n = parse_number_as_usize(pair.into_inner().next().unwrap().as_str())?;
462    Ok(TakeOp { n })
463}
464
465fn parse_skip_op(pair: pest::iterators::Pair<Rule>) -> Result<SkipOp> {
466    let n = parse_number_as_usize(pair.into_inner().next().unwrap().as_str())?;
467    Ok(SkipOp { n })
468}
469
470fn parse_slice_op(pair: pest::iterators::Pair<Rule>) -> Result<SliceOp> {
471    let mut inner_pairs = pair.into_inner();
472    let start = parse_number_as_usize(inner_pairs.next().unwrap().as_str())?;
473    let end = parse_number_as_usize(inner_pairs.next().unwrap().as_str())?;
474    Ok(SliceOp { start, end })
475}
476
477fn parse_drop_op(pair: pest::iterators::Pair<Rule>) -> Result<DropOp> {
478    let mut columns = Vec::new();
479
480    for inner_pair in pair.into_inner() {
481        if inner_pair.as_rule() == Rule::selector_list {
482            for selector_item_pair in inner_pair.into_inner() {
483                let (selector, _alias) = parse_selector_item(selector_item_pair)?;
484                columns.push(selector);
485            }
486        }
487    }
488
489    Ok(DropOp { columns })
490}
491
492fn parse_distinct_op(pair: pest::iterators::Pair<Rule>) -> Result<DistinctOp> {
493    let mut columns = None;
494
495    for inner_pair in pair.into_inner() {
496        if inner_pair.as_rule() == Rule::selector_list {
497            let mut selectors = Vec::new();
498            for selector_item_pair in inner_pair.into_inner() {
499                let (selector, _alias) = parse_selector_item(selector_item_pair)?;
500                selectors.push(selector);
501            }
502            columns = Some(selectors);
503        }
504    }
505
506    Ok(DistinctOp { columns })
507}
508
509fn parse_column_ref(pair: pest::iterators::Pair<Rule>) -> Result<ColumnRef> {
510    let inner = pair.into_inner().next().unwrap();
511
512    match inner.as_rule() {
513        Rule::positional_column => {
514            // $1, $2, etc. - AWK-style (1-based)
515            let text = inner.as_str();
516            let num_str = &text[1..]; // Skip the '$'
517            let position = parse_number_as_usize(num_str)?;
518            if position == 0 {
519                return Err(DtransformError::ParseError(
520                    "Positional columns start at $1, not $0".to_string()
521                ));
522            }
523            Ok(ColumnRef::Position(position))
524        }
525        Rule::identifier => Ok(ColumnRef::Name(inner.as_str().to_string())),
526        _ => Err(DtransformError::ParseError("Invalid column reference".to_string()))
527    }
528}
529
530fn parse_expression(pair: pest::iterators::Pair<Rule>) -> Result<Expression> {
531    match pair.as_rule() {
532        Rule::expression | Rule::logical_or | Rule::logical_and | Rule::comparison | Rule::term | Rule::factor => {
533            let mut pairs = pair.into_inner();
534            let first = pairs.next().unwrap();
535            let mut left = parse_expression(first)?;
536
537            while let Some(op_pair) = pairs.next() {
538                let op = match op_pair.as_rule() {
539                    Rule::comparison_op | Rule::add_op | Rule::sub_op | Rule::mul_op | Rule::div_op => {
540                        parse_bin_op(op_pair.as_str())?
541                    }
542                    _ if op_pair.as_str() == "and" || op_pair.as_str() == "or" => {
543                        parse_bin_op(op_pair.as_str())?
544                    }
545                    _ => {
546                        // This is the right operand
547                        let right = parse_expression(op_pair)?;
548                        return Ok(Expression::BinaryOp {
549                            left: Box::new(left),
550                            op: BinOp::Add, // This shouldn't happen
551                            right: Box::new(right),
552                        });
553                    }
554                };
555
556                let right_pair = pairs.next().unwrap();
557                let right = parse_expression(right_pair)?;
558
559                left = Expression::BinaryOp {
560                    left: Box::new(left),
561                    op,
562                    right: Box::new(right),
563                };
564            }
565
566            Ok(left)
567        }
568        Rule::primary => {
569            let inner = pair.into_inner().next().unwrap();
570            parse_expression(inner)
571        }
572        Rule::invalid_split => {
573            return Err(DtransformError::ParseError(
574                "split() must be followed by [index]. Example: split(text, ':')[0]".to_string()
575            ));
576        }
577        Rule::split_call => parse_split_call(pair),
578        Rule::lookup_call => parse_lookup_call(pair),
579        Rule::replace_call => parse_replace_call(pair),
580        Rule::regex_literal => {
581            let pattern = parse_string(pair.into_inner().next().unwrap())?;
582            Ok(Expression::Regex(pattern))
583        }
584        Rule::method_call => parse_method_call(pair),
585        Rule::positional_column => {
586            // $1, $2, etc. - AWK-style (1-based)
587            let text = pair.as_str();
588            let num_str = &text[1..]; // Skip the '$'
589            let position = parse_number_as_usize(num_str)?;
590            if position == 0 {
591                return Err(DtransformError::ParseError(
592                    "Positional columns start at $1, not $0".to_string()
593                ));
594            }
595            Ok(Expression::Column(ColumnRef::Position(position)))
596        }
597        Rule::column_ref => {
598            let col_ref = parse_column_ref(pair)?;
599            Ok(Expression::Column(col_ref))
600        }
601        Rule::list_literal => {
602            // Parse list literal: ['a', 'b', 'c'] or [1, 2, 3]
603            let mut literals = Vec::new();
604            for inner in pair.into_inner() {
605                if inner.as_rule() == Rule::literal_list {
606                    for literal_pair in inner.into_inner() {
607                        literals.push(parse_literal(literal_pair)?);
608                    }
609                }
610            }
611            Ok(Expression::List(literals))
612        }
613        Rule::literal => parse_literal_expression(pair),
614        Rule::boolean => {
615            let val = pair.as_str() == "true";
616            Ok(Expression::Literal(Literal::Boolean(val)))
617        }
618        Rule::null => Ok(Expression::Literal(Literal::Null)),
619        Rule::number => {
620            let val = parse_number(pair.as_str())?;
621            Ok(Expression::Literal(Literal::Number(val)))
622        }
623        Rule::string => {
624            let val = parse_string(pair)?;
625            Ok(Expression::Literal(Literal::String(val)))
626        }
627        Rule::identifier => {
628            Ok(Expression::Column(ColumnRef::Name(pair.as_str().to_string())))
629        }
630        _ => {
631            // Try to parse as expression recursively
632            let rule = pair.as_rule();
633            if let Some(inner) = pair.into_inner().next() {
634                parse_expression(inner)
635            } else {
636                Err(DtransformError::ParseError(format!("Unknown expression type: {:?}", rule)))
637            }
638        }
639    }
640}
641
642fn parse_split_call(pair: pest::iterators::Pair<Rule>) -> Result<Expression> {
643    let mut inner_pairs = pair.into_inner();
644
645    // Parse string expression
646    let string_expr = parse_expression(inner_pairs.next().unwrap())?;
647
648    // Parse delimiter expression
649    let delimiter_expr = parse_expression(inner_pairs.next().unwrap())?;
650
651    // Parse index (0-based)
652    let index_pair = inner_pairs.next().unwrap();
653    let index = parse_number_as_usize(index_pair.as_str())?;
654
655    Ok(Expression::Split {
656        string: Box::new(string_expr),
657        delimiter: Box::new(delimiter_expr),
658        index,
659    })
660}
661
662fn parse_lookup_call(pair: pest::iterators::Pair<Rule>) -> Result<Expression> {
663    let mut inner_pairs = pair.into_inner();
664
665    // Parse table name (identifier)
666    let table = inner_pairs.next().unwrap().as_str().to_string();
667
668    // Parse key expression
669    let key_expr = parse_expression(inner_pairs.next().unwrap())?;
670
671    // Parse 'on' field (string or column_ref)
672    let on = parse_lookup_field(inner_pairs.next().unwrap())?;
673
674    // Parse 'return' field (string or column_ref)
675    let return_field = parse_lookup_field(inner_pairs.next().unwrap())?;
676
677    Ok(Expression::Lookup {
678        table,
679        key: Box::new(key_expr),
680        on,
681        return_field,
682    })
683}
684
685fn parse_replace_call(pair: pest::iterators::Pair<Rule>) -> Result<Expression> {
686    let mut inner_pairs = pair.into_inner();
687
688    // Parse text expression (the string/column to perform replacement on)
689    let text_expr = parse_expression(inner_pairs.next().unwrap())?;
690
691    // Parse old expression (pattern to replace)
692    let old_expr = parse_expression(inner_pairs.next().unwrap())?;
693
694    // Parse new expression (replacement text)
695    let new_expr = parse_expression(inner_pairs.next().unwrap())?;
696
697    Ok(Expression::Replace {
698        text: Box::new(text_expr),
699        old: Box::new(old_expr),
700        new: Box::new(new_expr),
701    })
702}
703
704fn parse_lookup_field(pair: pest::iterators::Pair<Rule>) -> Result<crate::parser::ast::LookupField> {
705    use crate::parser::ast::LookupField;
706
707    let inner = pair.into_inner().next().unwrap();
708
709    match inner.as_rule() {
710        Rule::string => {
711            let name = parse_string(inner)?;
712            Ok(LookupField::Name(name))
713        }
714        Rule::column_ref => {
715            let col_inner = inner.into_inner().next().unwrap();
716            match col_inner.as_rule() {
717                Rule::positional_column => {
718                    // $1, $2, etc.
719                    let text = col_inner.as_str();
720                    let num_str = &text[1..]; // Skip the '$'
721                    let pos: usize = num_str.parse()
722                        .map_err(|_| DtransformError::ParseError(format!("Invalid column number: {}", num_str)))?;
723
724                    if pos == 0 {
725                        return Err(DtransformError::ParseError(
726                            "Column positions must be 1-based (e.g., $1, $2, ...)".to_string()
727                        ));
728                    }
729
730                    Ok(LookupField::Position(pos))
731                }
732                Rule::identifier => {
733                    // Named column
734                    let name = col_inner.as_str().to_string();
735                    Ok(LookupField::Name(name))
736                }
737                _ => Err(DtransformError::ParseError(format!(
738                    "Unexpected column reference type: {:?}",
739                    col_inner.as_rule()
740                )))
741            }
742        }
743        _ => Err(DtransformError::ParseError(format!(
744            "Expected string or column reference, got: {:?}",
745            inner.as_rule()
746        )))
747    }
748}
749
750fn parse_method_call(pair: pest::iterators::Pair<Rule>) -> Result<Expression> {
751    let mut inner_pairs = pair.into_inner();
752    let object_pair = inner_pairs.next().unwrap();
753
754    let mut object = match object_pair.as_rule() {
755        Rule::identifier => Expression::Column(ColumnRef::Name(object_pair.as_str().to_string())),
756        Rule::column_ref => {
757            let col_ref = parse_column_ref(object_pair)?;
758            Expression::Column(col_ref)
759        }
760        _ => parse_expression(object_pair)?
761    };
762
763    // Handle chained method calls
764    while let Some(method_pair) = inner_pairs.next() {
765        if method_pair.as_rule() == Rule::identifier {
766            let method = method_pair.as_str().to_string();
767
768            let mut args = Vec::new();
769            if let Some(arg_list_pair) = inner_pairs.next() {
770                if arg_list_pair.as_rule() == Rule::arg_list {
771                    for arg_pair in arg_list_pair.into_inner() {
772                        args.push(parse_expression(arg_pair)?);
773                    }
774                }
775            }
776
777            object = Expression::MethodCall {
778                object: Box::new(object),
779                method,
780                args,
781            };
782        }
783    }
784
785    Ok(object)
786}
787
788fn parse_literal(pair: pest::iterators::Pair<Rule>) -> Result<Literal> {
789    let inner = if pair.as_rule() == Rule::literal {
790        pair.into_inner().next().unwrap()
791    } else {
792        pair
793    };
794
795    match inner.as_rule() {
796        Rule::boolean => {
797            let val = inner.as_str() == "true";
798            Ok(Literal::Boolean(val))
799        }
800        Rule::null => Ok(Literal::Null),
801        Rule::number => {
802            let val = parse_number(inner.as_str())?;
803            Ok(Literal::Number(val))
804        }
805        Rule::string => {
806            let val = parse_string(inner)?;
807            Ok(Literal::String(val))
808        }
809        _ => Err(DtransformError::ParseError("Invalid literal".to_string()))
810    }
811}
812
813fn parse_literal_expression(pair: pest::iterators::Pair<Rule>) -> Result<Expression> {
814    parse_literal(pair).map(Expression::Literal)
815}
816
817fn parse_bin_op(op_str: &str) -> Result<BinOp> {
818    match op_str {
819        "+" => Ok(BinOp::Add),
820        "-" => Ok(BinOp::Sub),
821        "*" => Ok(BinOp::Mul),
822        "/" => Ok(BinOp::Div),
823        ">" => Ok(BinOp::Gt),
824        "<" => Ok(BinOp::Lt),
825        ">=" => Ok(BinOp::Gte),
826        "<=" => Ok(BinOp::Lte),
827        "==" => Ok(BinOp::Eq),
828        "!=" => Ok(BinOp::Neq),
829        "and" => Ok(BinOp::And),
830        "or" => Ok(BinOp::Or),
831        "in" => Ok(BinOp::In),
832        _ => Err(DtransformError::ParseError(format!("Unknown operator: {}", op_str)))
833    }
834}
835
836fn parse_string(pair: pest::iterators::Pair<Rule>) -> Result<String> {
837    let inner = pair.into_inner().next().unwrap();
838    let s = inner.as_str();
839
840    // Unescape common escape sequences
841    let unescaped = s
842        .replace("\\n", "\n")
843        .replace("\\r", "\r")
844        .replace("\\t", "\t")
845        .replace("\\\"", "\"")
846        .replace("\\'", "'")
847        .replace("\\\\", "\\");
848
849    Ok(unescaped)
850}
851
852fn parse_param_value(pair: pest::iterators::Pair<Rule>) -> Result<String> {
853    match pair.as_rule() {
854        Rule::param_value => {
855            // param_value wraps the actual value, unwrap it
856            let inner = pair.into_inner().next().unwrap();
857            parse_param_value(inner)
858        }
859        Rule::string => parse_string(pair),
860        Rule::number => Ok(pair.as_str().to_string()),
861        Rule::boolean => Ok(pair.as_str().to_string()),
862        Rule::identifier => Ok(pair.as_str().to_string()),
863        _ => Err(DtransformError::ParseError(format!("Invalid parameter value: {:?}", pair.as_rule())))
864    }
865}
866
867fn parse_number(s: &str) -> Result<f64> {
868    // Handle suffixes (k, m, b)
869    let multiplier = if s.ends_with('k') || s.ends_with('K') {
870        1000.0
871    } else if s.ends_with('m') || s.ends_with('M') {
872        1_000_000.0
873    } else if s.ends_with('b') || s.ends_with('B') {
874        1_000_000_000.0
875    } else {
876        1.0
877    };
878
879    let num_str = if multiplier != 1.0 {
880        &s[..s.len() - 1]
881    } else {
882        s
883    };
884
885    num_str.parse::<f64>()
886        .map(|n| n * multiplier)
887        .map_err(|_| DtransformError::ParseError(format!("Invalid number: {}", s)))
888}
889
890fn parse_number_as_usize(s: &str) -> Result<usize> {
891    parse_number(s).and_then(|n| {
892        if n < 0.0 || n.fract() != 0.0 {
893            Err(DtransformError::ParseError(format!("Expected positive integer, got: {}", s)))
894        } else {
895            Ok(n as usize)
896        }
897    })
898}